This is an automated email from the ASF dual-hosted git repository. robbie pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/qpid-proton-j.git
commit 8c1f2326d46b9a67ae14bc3431acc6cddfbb7524 Author: Robbie Gemmell <rob...@apache.org> AuthorDate: Wed Mar 1 16:24:12 2023 +0000 PROTON-2688: clean up entries in transport work list for sender delivery that can no longer be sent --- .../qpid/proton/engine/impl/TransportImpl.java | 6 ++ .../qpid/proton/engine/impl/TransportLink.java | 11 +++ .../qpid/proton/engine/impl/TransportImplTest.java | 86 ++++++++++++++++++++++ 3 files changed, 103 insertions(+) diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java index da2e9abb..b9f07dc0 100644 --- a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java +++ b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java @@ -469,6 +469,7 @@ public class TransportImpl extends EndpointImpl } writeFrame(transportSession.getLocalChannel(), detach, null, null); + transportLink.sentDetach(); } endpoint.clearModified(); @@ -699,6 +700,11 @@ public class TransportImpl extends EndpointImpl writeFrame(tpSession.getLocalChannel(), cachedDisposition, null, null); } + if(!wasDone && tpLink != null && tpLink.detachSent()) { + // Too late to action this work, clear it. + return true; + } + return !delivery.isBuffered(); } diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportLink.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportLink.java index 836cf71c..0bd20cf1 100644 --- a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportLink.java +++ b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportLink.java @@ -37,6 +37,7 @@ class TransportLink<T extends LinkImpl> private UnsignedInteger _remoteLinkCredit; private boolean _detachReceived; private boolean _attachSent; + private boolean _detachSent; protected TransportLink(T link) { @@ -226,4 +227,14 @@ class TransportLink<T extends LinkImpl> { _remoteDeliveryCount = remoteDeliveryCount; } + + public boolean detachSent() + { + return _detachSent; + } + + public void sentDetach() + { + _detachSent = true; + } } diff --git a/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/TransportImplTest.java b/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/TransportImplTest.java index 82ff4c81..87d845de 100644 --- a/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/TransportImplTest.java +++ b/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/TransportImplTest.java @@ -5109,4 +5109,90 @@ public class TransportImplTest assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 6, transport.writes.size()); assertTrue("Unexpected frame type", transport.writes.get(5) instanceof Close); } + + /** + * Verify that the transport work list doesnt retain delivery (+link+session) object when a sender link is closed + * closed and freed while there a buffered/not-transferred message outstanding, as they can no longer be transferred. + */ + @Test + public void testTransportWorkListDoesntLeakDeliveriesEtcFromSenderLinkFreedWithBufferedSend() + { + MockTransportImpl transport = new MockTransportImpl(); + ConnectionImpl connection = new ConnectionImpl(); + transport.bind(connection); + + connection.open(); + + Session session = connection.session(); + session.open(); + + String linkName = "myClientSender"; + Sender sender = session.sender(linkName); + sender.open(); + + pumpMockTransport(transport); + + assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 3, transport.writes.size()); + + assertTrue("Unexpected frame type", transport.writes.get(0) instanceof Open); + assertTrue("Unexpected frame type", transport.writes.get(1) instanceof Begin); + assertTrue("Unexpected frame type", transport.writes.get(2) instanceof Attach); + + assertNull("Should not yet have a delivery", sender.current()); + + // Send the necessary responses to open/begin/attach. DO NOT give any credit. + transport.handleFrame(new TransportFrame(0, new Open(), null)); + + Begin begin = new Begin(); + begin.setRemoteChannel(UnsignedShort.valueOf((short) 0)); + begin.setNextOutgoingId(UnsignedInteger.ONE); + begin.setIncomingWindow(UnsignedInteger.valueOf(1024)); + begin.setOutgoingWindow(UnsignedInteger.valueOf(1024)); + transport.handleFrame(new TransportFrame(0, begin, null)); + + Attach attach = new Attach(); + attach.setHandle(UnsignedInteger.ZERO); + attach.setRole(Role.RECEIVER); + attach.setName(linkName); + attach.setInitialDeliveryCount(UnsignedInteger.ZERO); + transport.handleFrame(new TransportFrame(0, attach, null)); + + assertEndpointState(sender, EndpointState.ACTIVE, EndpointState.ACTIVE); + assertEndpointState(session, EndpointState.ACTIVE, EndpointState.ACTIVE); + + assertEquals("Expected the sender to have no credit", 0, sender.getCredit()); + + Delivery delivery = sendMessage(sender, "tag1", "content1"); + + assertEquals("Expected the delivery to be on the transport work list", delivery, connection.getTransportWorkHead()); + assertEquals("Expected the sender to have queued message", 1 , sender.getQueued()); + + pumpMockTransport(transport); + + // Expect no more frames to have been sent, delivery cant be sent without credit + assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 3, transport.writes.size()); + + assertEquals("Expected the delivery to be on the transport work list", delivery, connection.getTransportWorkHead()); + + // Send a remote request to close the sender and action it + Detach detach = new Detach(); + detach.setHandle(UnsignedInteger.ZERO); + detach.setClosed(true); + transport.handleFrame(new TransportFrame(0, detach, null)); + + assertEndpointState(sender, EndpointState.ACTIVE, EndpointState.CLOSED); + + sender.close(); + sender.free(); + + pumpMockTransport(transport); + + assertEndpointState(sender, EndpointState.CLOSED, EndpointState.CLOSED); + + assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 4, transport.writes.size()); + assertTrue("Unexpected frame type", transport.writes.get(3) instanceof Detach); + + // Check the delivery isnt in the work list as it clearly cant be sent now the sender is closed. + assertNull("Expected the transport work list to be empty", connection.getTransportWorkHead()); + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org