Repository: qpid-proton Updated Branches: refs/heads/0.12.x 8c6c7c531 -> 271b36363
PROTON-1100: also protect against an NPE that occurs if sender link has messages on it before the Open frame is sent (cherry picked from commit 6422e2497b62b46db9e993059bc514a53a8ed643) Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/193bcebd Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/193bcebd Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/193bcebd Branch: refs/heads/0.12.x Commit: 193bcebd72e0d7f6107732db7dbb2bc4704345f1 Parents: 8c6c7c5 Author: Robert Gemmell <[email protected]> Authored: Tue Jan 26 13:30:14 2016 +0000 Committer: Robert Gemmell <[email protected]> Committed: Tue Jan 26 14:16:22 2016 +0000 ---------------------------------------------------------------------- .../qpid/proton/engine/impl/TransportImpl.java | 2 +- .../proton/engine/impl/TransportImplTest.java | 108 ++++++++++++++++++- 2 files changed, 108 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/193bcebd/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java ---------------------------------------------------------------------- diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java index 93335b0..d85794f 100644 --- a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java +++ b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java @@ -489,7 +489,7 @@ public class TransportImpl extends EndpointImpl private void processTransportWork() { - if(_connectionEndpoint != null) + if(_connectionEndpoint != null && _isOpenSent) { DeliveryImpl delivery = _connectionEndpoint.getTransportWorkHead(); while(delivery != null) http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/193bcebd/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/TransportImplTest.java ---------------------------------------------------------------------- diff --git a/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/TransportImplTest.java b/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/TransportImplTest.java index a2e2c78..888f4af 100644 --- a/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/TransportImplTest.java +++ b/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/TransportImplTest.java @@ -29,22 +29,32 @@ import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; import java.util.LinkedList; import org.apache.qpid.proton.Proton; import org.apache.qpid.proton.amqp.UnsignedInteger; +import org.apache.qpid.proton.amqp.UnsignedShort; +import org.apache.qpid.proton.amqp.messaging.AmqpValue; import org.apache.qpid.proton.amqp.transport.Attach; import org.apache.qpid.proton.amqp.transport.Begin; +import org.apache.qpid.proton.amqp.transport.Flow; import org.apache.qpid.proton.amqp.transport.FrameBody; import org.apache.qpid.proton.amqp.transport.Open; +import org.apache.qpid.proton.amqp.transport.Role; +import org.apache.qpid.proton.amqp.transport.Transfer; +import org.apache.qpid.proton.engine.Collector; import org.apache.qpid.proton.engine.Connection; +import org.apache.qpid.proton.engine.Delivery; import org.apache.qpid.proton.engine.EndpointState; import org.apache.qpid.proton.engine.Link; import org.apache.qpid.proton.engine.Receiver; +import org.apache.qpid.proton.engine.Sender; import org.apache.qpid.proton.engine.Session; import org.apache.qpid.proton.engine.Transport; import org.apache.qpid.proton.engine.TransportException; import org.apache.qpid.proton.framing.TransportFrame; +import org.apache.qpid.proton.message.Message; import org.junit.Rule; import org.junit.Test; import org.junit.rules.ExpectedException; @@ -58,6 +68,8 @@ public class TransportImplTest private static final TransportFrame TRANSPORT_FRAME_BEGIN = new TransportFrame(CHANNEL_ID, new Begin(), null); private static final TransportFrame TRANSPORT_FRAME_OPEN = new TransportFrame(CHANNEL_ID, new Open(), null); + private static final int BUFFER_SIZE = 4096; + @Rule public ExpectedException _expectedException = ExpectedException.none(); @@ -457,7 +469,7 @@ public class TransportImplTest * be pipelined together. */ @Test - public void testReceiverFlowWithoutOpen() + public void testReceiverFlowBeforeOpenConnection() { MockTransportImpl transport = new MockTransportImpl(); Connection connection = Proton.connection(); @@ -485,6 +497,76 @@ public class TransportImplTest assertTrue("Unexpected frame type", transport.writes.get(1) instanceof Begin); } + @Test + public void testSenderSendBeforeOpenConnection() + { + MockTransportImpl transport = new MockTransportImpl(); + + Connection connection = Proton.connection(); + transport.bind(connection); + + Collector collector = Collector.Factory.create(); + connection.collect(collector); + + Session session = connection.session(); + session.open(); + + String linkName = "mySender"; + Sender sender = session.sender(linkName); + sender.open(); + + sendMessage(sender, "tag1", "content1"); + + pumpMockTransport(transport); + + assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 0, transport.writes.size()); + + // Now open the connection, expect the Open and Begin and Attach frames but + // nothing else as we the sender wont have credit yet. + connection.open(); + + pumpMockTransport(transport); + + assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 3, transport.writes.size()); + + assertTrue("Unexpected frame type", transport.writes.get(0) instanceof Open); + assertTrue("Unexpected frame type", transport.writes.get(1) instanceof Begin); + assertTrue("Unexpected frame type", transport.writes.get(2) instanceof Attach); + + // Send the necessary responses to open/begin/attach then give sender credit + transport.handleFrame(new TransportFrame(0, new Open(), null)); + + Begin begin = new Begin(); + begin.setRemoteChannel(UnsignedShort.valueOf((short) 0)); + transport.handleFrame(new TransportFrame(0, begin, null)); + + Attach attach = new Attach(); + attach.setHandle(UnsignedInteger.ZERO); + attach.setRole(Role.RECEIVER); + attach.setName(linkName); + attach.setInitialDeliveryCount(UnsignedInteger.ZERO); + transport.handleFrame(new TransportFrame(0, attach, null)); + + Flow flow = new Flow(); + flow.setHandle(UnsignedInteger.ZERO); + flow.setDeliveryCount(UnsignedInteger.ZERO); + flow.setNextIncomingId(UnsignedInteger.ONE); + flow.setNextOutgoingId(UnsignedInteger.ZERO); + flow.setIncomingWindow(UnsignedInteger.valueOf(1024)); + flow.setOutgoingWindow(UnsignedInteger.valueOf(1024)); + flow.setLinkCredit(UnsignedInteger.valueOf(10)); + + transport.handleFrame(new TransportFrame(0, flow, null)); + + assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 3, transport.writes.size()); + + // Now pump the transport again and expect a transfer for the message + pumpMockTransport(transport); + + assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 4, transport.writes.size()); + assertTrue("Unexpected frame type", transport.writes.get(3) instanceof Transfer); + } + private void pumpMockTransport(MockTransportImpl transport) { while(transport.pending() > 0) @@ -507,4 +589,28 @@ public class TransportImplTest return result; } } + + private Delivery sendMessage(Sender sender, String deliveryTag, String messageContent) + { + byte[] tag = deliveryTag.getBytes(StandardCharsets.UTF_8); + + Message m = Message.Factory.create(); + m.setBody(new AmqpValue(messageContent)); + + byte[] encoded = new byte[BUFFER_SIZE]; + int len = m.encode(encoded, 0, BUFFER_SIZE); + + assertTrue("given array was too small", len < BUFFER_SIZE); + + Delivery delivery = sender.delivery(tag); + + int sent = sender.send(encoded, 0, len); + + assertEquals("sender unable to send all data at once as assumed for simplicity", len, sent); + + boolean senderAdvanced = sender.advance(); + assertTrue("sender has not advanced", senderAdvanced); + + return delivery; + } } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
