PROTON-971: record the in-progress delivery on the transport link, don't send transfers for new deliveries until the existing one is completed
Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/a94e6351 Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/a94e6351 Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/a94e6351 Branch: refs/heads/go1 Commit: a94e635152e5ab32c3dbe5ea87f88173d01f4ce1 Parents: cfd73cd Author: Robert Gemmell <[email protected]> Authored: Tue Nov 3 12:11:51 2015 +0000 Committer: Robert Gemmell <[email protected]> Committed: Tue Nov 3 12:15:40 2015 +0000 ---------------------------------------------------------------------- .../qpid/proton/engine/impl/TransportImpl.java | 19 +++++- .../proton/engine/impl/TransportSender.java | 10 +++ tests/python/proton_tests/engine.py | 69 +++++++++++++++++++- 3 files changed, 96 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/a94e6351/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java ---------------------------------------------------------------------- diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java index 7faadc6..f318319 100644 --- a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java +++ b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java @@ -517,7 +517,7 @@ public class TransportImpl extends EndpointImpl private boolean processTransportWorkSender(DeliveryImpl delivery, SenderImpl snd) { - TransportLink<SenderImpl> tpLink = snd.getTransportLink(); + TransportSender tpLink = snd.getTransportLink(); SessionImpl session = snd.getSession(); TransportSession tpSession = session.getTransportSession(); @@ -529,6 +529,16 @@ public class TransportImpl extends EndpointImpl tpSession.isLocalChannelSet() && tpLink.getLocalHandle() != null && !_frameWriter.isFull()) { + DeliveryImpl inProgress = tpLink.getInProgressDelivery(); + if(inProgress != null){ + // There is an existing Delivery awaiting completion. Check it + // is the same Delivery object given and return if not, as we + // can't interleave Transfer frames for deliveries on a link. + if(inProgress != delivery) { + return false; + } + } + UnsignedInteger deliveryId = tpSession.getOutgoingDeliveryId(); TransportDelivery tpDelivery = new TransportDelivery(deliveryId, delivery, tpLink); delivery.setTransportDelivery(tpDelivery); @@ -575,6 +585,9 @@ public class TransportImpl extends EndpointImpl delivery.setDataLength(0); if (!transfer.getMore()) { + // Clear the in-progress delivery marker + tpLink.setInProgressDelivery(null); + delivery.setDone(); tpLink.setDeliveryCount(tpLink.getDeliveryCount().add(UnsignedInteger.ONE)); tpLink.setLinkCredit(tpLink.getLinkCredit().subtract(UnsignedInteger.ONE)); @@ -589,6 +602,10 @@ public class TransportImpl extends EndpointImpl delivery.setDataOffset(delivery.getDataOffset() + delta); delivery.setDataLength(payload.remaining()); session.incrementOutgoingBytes(-delta); + + // Remember the delivery we are still processing + // the body transfer frames for + tpLink.setInProgressDelivery(delivery); } if (snd.getLocalState() != EndpointState.CLOSED) { http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/a94e6351/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportSender.java ---------------------------------------------------------------------- diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportSender.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportSender.java index 26c39f5..cebe577 100644 --- a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportSender.java +++ b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportSender.java @@ -27,6 +27,7 @@ import org.apache.qpid.proton.amqp.transport.Flow; class TransportSender extends TransportLink<SenderImpl> { private boolean _drain; + private DeliveryImpl _inProgressDelivery; private static final UnsignedInteger ORIGINAL_DELIVERY_COUNT = UnsignedInteger.ZERO; TransportSender(SenderImpl link) @@ -57,4 +58,13 @@ class TransportSender extends TransportLink<SenderImpl> setLinkCredit(linkCredit); } + public void setInProgressDelivery(DeliveryImpl inProgressDelivery) + { + _inProgressDelivery = inProgressDelivery; + } + + public DeliveryImpl getInProgressDelivery() + { + return _inProgressDelivery; + } } http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/a94e6351/tests/python/proton_tests/engine.py ---------------------------------------------------------------------- diff --git a/tests/python/proton_tests/engine.py b/tests/python/proton_tests/engine.py index 7946d2c..0a6eb8d 100644 --- a/tests/python/proton_tests/engine.py +++ b/tests/python/proton_tests/engine.py @@ -1100,7 +1100,74 @@ class MaxFrameTransferTest(Test): binary = self.rcv.recv(1024) assert binary == None - + + def testSendQueuedMultiFrameMessages(self, sendSingleFrameMsg = False): + """ + Test that multiple queued messages on the same link + with multi-frame content are sent correctly. Use an + odd max frame size, send enough data to use many. + """ + self.snd, self.rcv = self.link("test-link", max_frame=[0,517]) + self.c1 = self.snd.session.connection + self.c2 = self.rcv.session.connection + self.snd.open() + self.rcv.open() + self.pump() + assert self.rcv.session.connection.transport.max_frame_size == 517 + assert self.snd.session.connection.transport.remote_max_frame_size == 517 + + self.rcv.flow(5) + + self.pump() + + # Send a delivery with 5 frames, all bytes as X1234 + self.snd.delivery("tag") + msg = ("X1234" * 425).encode('utf-8') + assert 2125 == len(msg) + n = self.snd.send(msg) + assert n == len(msg) + assert self.snd.advance() + + # Send a delivery with 5 frames, all bytes as Y5678 + self.snd.delivery("tag2") + msg2 = ("Y5678" * 425).encode('utf-8') + assert 2125 == len(msg2) + n = self.snd.send(msg2) + assert n == len(msg2) + assert self.snd.advance() + + self.pump() + + if sendSingleFrameMsg: + # Send a delivery with 1 frame + self.snd.delivery("tag3") + msg3 = ("Z").encode('utf-8') + assert 1 == len(msg3) + n = self.snd.send(msg3) + assert n == len(msg3) + assert self.snd.advance() + self.pump() + + binary = self.rcv.recv(5000) + self.assertEqual(binary, msg) + + self.rcv.advance() + + binary2 = self.rcv.recv(5000) + self.assertEqual(binary2, msg2) + + self.rcv.advance() + + if sendSingleFrameMsg: + binary3 = self.rcv.recv(5000) + self.assertEqual(binary3, msg3) + self.rcv.advance() + + self.pump() + + def testSendQueuedMultiFrameMessagesThenSingleFrameMessage(self): + self.testSendQueuedMultiFrameMessages(sendSingleFrameMsg = True) + def testBigMessage(self): """ Test transfering a big message. --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
