This is an automated email from the ASF dual-hosted git repository. robbie pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/qpid-proton-j.git
The following commit(s) were added to refs/heads/main by this push: new 077bc2d2 PROTON-2687: clean up stale entries in transport work list that cant be actioned, and prevent them from entering where possible 077bc2d2 is described below commit 077bc2d2d374a342ec5b82589486d10067a0b528 Author: Robbie Gemmell <rob...@apache.org> AuthorDate: Thu Feb 23 20:02:41 2023 +0000 PROTON-2687: clean up stale entries in transport work list that cant be actioned, and prevent them from entering where possible --- .../qpid/proton/engine/impl/DeliveryImpl.java | 6 + .../qpid/proton/engine/impl/TransportImpl.java | 16 +- .../qpid/proton/engine/impl/TransportSession.java | 11 + .../qpid/proton/engine/impl/DeliveryImplTest.java | 2 + .../qpid/proton/engine/impl/TransportImplTest.java | 222 +++++++++++++++++++++ 5 files changed, 255 insertions(+), 2 deletions(-) diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/DeliveryImpl.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/DeliveryImpl.java index 2928d376..432e8aa8 100644 --- a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/DeliveryImpl.java +++ b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/DeliveryImpl.java @@ -291,6 +291,12 @@ public class DeliveryImpl implements Delivery void addToTransportWorkList() { + TransportSession transportSession = getLink().getSession().getTransportSession(); + if (transportSession != null && transportSession.endSent()) { + // Too late to action this work, dont add it to the transport work list. + return; + } + getLink().getConnectionImpl().addTransportWork(this); } diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java index aa304965..6653bd79 100644 --- a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java +++ b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java @@ -561,12 +561,18 @@ public class TransportImpl extends EndpointImpl SessionImpl session = snd.getSession(); TransportSession tpSession = session.getTransportSession(); + if (tpSession.endSent()) { + // Too late to action this work, clear it. + return true; + } + + boolean localChannelSet = tpSession.isLocalChannelSet(); boolean wasDone = delivery.isDone(); if(!delivery.isDone() && (delivery.getDataLength() > 0 || delivery != snd.current()) && tpSession.hasOutgoingCredit() && tpLink.hasCredit() && - tpSession.isLocalChannelSet() && + localChannelSet && tpLink.getLocalHandle() != null && !_frameWriter.isFull()) { DeliveryImpl inProgress = tpLink.getInProgressDelivery(); @@ -676,7 +682,7 @@ public class TransportImpl extends EndpointImpl } } - if(wasDone && delivery.getLocalState() != null) + if(wasDone && delivery.getLocalState() != null && localChannelSet) { TransportDelivery tpDelivery = delivery.getTransportDelivery(); // Use cached object as holder of data for immediate write to the FrameWriter @@ -703,6 +709,11 @@ public class TransportImpl extends EndpointImpl SessionImpl session = rcv.getSession(); TransportSession tpSession = session.getTransportSession(); + if (tpSession.endSent()) { + // Too late to action this work, clear it. + return true; + } + if (tpSession.isLocalChannelSet()) { boolean settled = delivery.isSettled(); @@ -1055,6 +1066,7 @@ public class TransportImpl extends EndpointImpl } writeFrame(channel, end, null, null); + transportSession.sentEnd(); } endpoint.clearModified(); diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportSession.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportSession.java index 0bab2912..d657ba95 100644 --- a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportSession.java +++ b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportSession.java @@ -70,6 +70,7 @@ class TransportSession private int _unsettledIncomingSize; private boolean _endReceived; private boolean _beginSent; + private boolean _endSent; TransportSession(TransportImpl transport, SessionImpl session) { @@ -527,4 +528,14 @@ class TransportSession { _beginSent = true; } + + public boolean endSent() + { + return _endSent; + } + + public void sentEnd() + { + _endSent = true; + } } diff --git a/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/DeliveryImplTest.java b/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/DeliveryImplTest.java index cd390ae1..18143af8 100644 --- a/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/DeliveryImplTest.java +++ b/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/DeliveryImplTest.java @@ -735,7 +735,9 @@ public class DeliveryImplTest private DeliveryImpl createSenderDelivery() { LinkImpl link = Mockito.mock(SenderImpl.class); ConnectionImpl connection = Mockito.mock(ConnectionImpl.class); + SessionImpl session = Mockito.mock(SessionImpl.class); + Mockito.when(link.getSession()).thenReturn(session); Mockito.when(link.getConnectionImpl()).thenReturn(connection); return new DeliveryImpl(null, link, null); diff --git a/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/TransportImplTest.java b/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/TransportImplTest.java index 4d65343b..56e518bf 100644 --- a/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/TransportImplTest.java +++ b/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/TransportImplTest.java @@ -4895,4 +4895,226 @@ public class TransportImplTest assertNoEvents(collector); } + + /** + * Verify that the transport work list doesnt retain deliveries+link+session on when a session + * is closed and freed while there is an active receiver link with deliveries still outstanding. + */ + @Test + public void testTransportWorkListDoesntLeakDeliveriesEtcFromSessionFreedWithActiveReceiverWithOutstandingDeliveries() + { + MockTransportImpl transport = new MockTransportImpl(); + ConnectionImpl connection = new ConnectionImpl(); + transport.bind(connection); + + connection.open(); + + Session session = connection.session(); + session.open(); + + String linkName = "myClientReceiver"; + Receiver receiver = session.receiver(linkName); + receiver.flow(5); + receiver.open(); + + pumpMockTransport(transport); + + assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 4, transport.writes.size()); + + assertTrue("Unexpected frame type", transport.writes.get(0) instanceof Open); + assertTrue("Unexpected frame type", transport.writes.get(1) instanceof Begin); + assertTrue("Unexpected frame type", transport.writes.get(2) instanceof Attach); + assertTrue("Unexpected frame type", transport.writes.get(3) instanceof Flow); + + Delivery delivery = receiver.current(); + assertNull("Should not yet have a delivery", delivery); + + // Send the necessary responses to open/begin/attach as well as a transfer + transport.handleFrame(new TransportFrame(0, new Open(), null)); + + Begin begin = new Begin(); + begin.setRemoteChannel(UnsignedShort.valueOf((short) 0)); + begin.setNextOutgoingId(UnsignedInteger.ONE); + begin.setIncomingWindow(UnsignedInteger.valueOf(1024)); + begin.setOutgoingWindow(UnsignedInteger.valueOf(1024)); + transport.handleFrame(new TransportFrame(0, begin, null)); + + Attach attach = new Attach(); + attach.setHandle(UnsignedInteger.ZERO); + attach.setRole(Role.SENDER); + attach.setName(linkName); + attach.setInitialDeliveryCount(UnsignedInteger.ZERO); + transport.handleFrame(new TransportFrame(0, attach, null)); + + assertEndpointState(receiver, EndpointState.ACTIVE, EndpointState.ACTIVE); + assertEndpointState(session, EndpointState.ACTIVE, EndpointState.ACTIVE); + + String deliveryTag = "tag1"; + String messageContent = "content1"; + handleTransfer(transport, 1, deliveryTag, messageContent); + + assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 4, transport.writes.size()); + + delivery = verifyDelivery(receiver, deliveryTag, messageContent); + assertNotNull("Should now have a delivery", delivery); + + delivery.disposition(Accepted.getInstance()); + + assertEquals("Expected the delivery to be on the transport work list", delivery, connection.getTransportWorkHead()); + + pumpMockTransport(transport); + + assertNull("Expected the delivery to cleared from the transport work list", connection.getTransportWorkHead()); + + assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 5, transport.writes.size()); + assertTrue("Unexpected frame type", transport.writes.get(4) instanceof Disposition); + + session.close(); + + pumpMockTransport(transport); + + assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 6, transport.writes.size()); + assertTrue("Unexpected frame type", transport.writes.get(5) instanceof End); + + assertNull("Expected the transport work list to be empty", connection.getTransportWorkHead()); + assertEndpointState(session, EndpointState.CLOSED, EndpointState.ACTIVE); + + // Send the necessary responses to End + End end = new End(); + begin.setRemoteChannel(UnsignedShort.valueOf((short) 0)); + begin.setNextOutgoingId(UnsignedInteger.ONE); + begin.setIncomingWindow(UnsignedInteger.valueOf(1024)); + begin.setOutgoingWindow(UnsignedInteger.valueOf(1024)); + transport.handleFrame(new TransportFrame(0, end, null)); + + assertEndpointState(session, EndpointState.CLOSED, EndpointState.CLOSED); + + assertNull("Expected the transport work list to be empty", connection.getTransportWorkHead()); + + session.free(); + + pumpMockTransport(transport); + + assertNull("Expected the transport work list to be empty", connection.getTransportWorkHead()); + assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 6, transport.writes.size()); + + connection.close(); + pumpMockTransport(transport); + + assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 7, transport.writes.size()); + assertTrue("Unexpected frame type", transport.writes.get(6) instanceof Close); + } + + /** + * Verify that the transport doesnt retain deliveries+link+session when a session is closed + * and freed while there is an active sender link with deliveries still outstanding. + */ + @Test + public void testTransportWorkListDoesntLeakDeliveriesEtcFromSessionFreedWithActiveSenderWithOutstandingDeliveries() + { + MockTransportImpl transport = new MockTransportImpl(); + ConnectionImpl connection = new ConnectionImpl(); + transport.bind(connection); + + connection.open(); + + Session session = connection.session(); + session.open(); + + String linkName = "myClientSender"; + Sender sender = session.sender(linkName); + sender.open(); + + pumpMockTransport(transport); + + assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 3, transport.writes.size()); + + assertTrue("Unexpected frame type", transport.writes.get(0) instanceof Open); + assertTrue("Unexpected frame type", transport.writes.get(1) instanceof Begin); + assertTrue("Unexpected frame type", transport.writes.get(2) instanceof Attach); + + assertNull("Should not yet have a delivery", sender.current()); + + // Send the necessary responses to open/begin/attach as well as a transfer + transport.handleFrame(new TransportFrame(0, new Open(), null)); + + Begin begin = new Begin(); + begin.setRemoteChannel(UnsignedShort.valueOf((short) 0)); + begin.setNextOutgoingId(UnsignedInteger.ONE); + begin.setIncomingWindow(UnsignedInteger.valueOf(1024)); + begin.setOutgoingWindow(UnsignedInteger.valueOf(1024)); + transport.handleFrame(new TransportFrame(0, begin, null)); + + // Give the necessary response to attach for sender and grant some credit + Attach attach = new Attach(); + attach.setHandle(UnsignedInteger.ZERO); + attach.setRole(Role.RECEIVER); + attach.setName(linkName); + attach.setInitialDeliveryCount(UnsignedInteger.ZERO); + transport.handleFrame(new TransportFrame(0, attach, null)); + + int credit = 10; + Flow flow = new Flow(); + flow.setHandle(UnsignedInteger.ZERO); + flow.setDeliveryCount(UnsignedInteger.ZERO); + flow.setNextIncomingId(UnsignedInteger.ONE); + flow.setNextOutgoingId(UnsignedInteger.ZERO); + flow.setIncomingWindow(UnsignedInteger.valueOf(1024)); + flow.setOutgoingWindow(UnsignedInteger.valueOf(1024)); + flow.setLinkCredit(UnsignedInteger.valueOf(credit)); + + transport.handleFrame(new TransportFrame(0, flow, null)); + + assertEndpointState(sender, EndpointState.ACTIVE, EndpointState.ACTIVE); + assertEndpointState(session, EndpointState.ACTIVE, EndpointState.ACTIVE); + + assertEquals("Expected the sender to have credit", credit, sender.getCredit()); + + Delivery delivery = sendMessage(sender, "tag1", "content1"); + + assertEquals("Expected the delivery to be on the transport work list", delivery, connection.getTransportWorkHead()); + assertEquals("Expected the sender to have 1 less credit", credit -1 , sender.getCredit()); + + pumpMockTransport(transport); + + assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 4, transport.writes.size()); + assertTrue("Unexpected frame type", transport.writes.get(3) instanceof Transfer); + + assertNull("Expected the delivery to cleared from the transport work list", connection.getTransportWorkHead()); + + session.close(); + + pumpMockTransport(transport); + + assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 5, transport.writes.size()); + assertTrue("Unexpected frame type", transport.writes.get(4) instanceof End); + + assertNull("Expected the transport work list to be empty", connection.getTransportWorkHead()); + assertEndpointState(session, EndpointState.CLOSED, EndpointState.ACTIVE); + + // Send the necessary responses to End + End end = new End(); + begin.setRemoteChannel(UnsignedShort.valueOf((short) 0)); + begin.setNextOutgoingId(UnsignedInteger.ONE); + begin.setIncomingWindow(UnsignedInteger.valueOf(1024)); + begin.setOutgoingWindow(UnsignedInteger.valueOf(1024)); + transport.handleFrame(new TransportFrame(0, end, null)); + + assertEndpointState(session, EndpointState.CLOSED, EndpointState.CLOSED); + + assertNull("Expected the transport work list to be empty", connection.getTransportWorkHead()); + + session.free(); + + pumpMockTransport(transport); + + assertNull("Expected the transport work list to be empty", connection.getTransportWorkHead()); + assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 5, transport.writes.size()); + + connection.close(); + pumpMockTransport(transport); + + assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 6, transport.writes.size()); + assertTrue("Unexpected frame type", transport.writes.get(5) instanceof Close); + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org