This is an automated email from the ASF dual-hosted git repository.

robbie pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/qpid-proton-j.git


The following commit(s) were added to refs/heads/main by this push:
     new 077bc2d2 PROTON-2687: clean up stale entries in transport work list 
that cant be actioned, and prevent them from entering where possible
077bc2d2 is described below

commit 077bc2d2d374a342ec5b82589486d10067a0b528
Author: Robbie Gemmell <rob...@apache.org>
AuthorDate: Thu Feb 23 20:02:41 2023 +0000

    PROTON-2687: clean up stale entries in transport work list that cant be 
actioned, and prevent them from entering where possible
---
 .../qpid/proton/engine/impl/DeliveryImpl.java      |   6 +
 .../qpid/proton/engine/impl/TransportImpl.java     |  16 +-
 .../qpid/proton/engine/impl/TransportSession.java  |  11 +
 .../qpid/proton/engine/impl/DeliveryImplTest.java  |   2 +
 .../qpid/proton/engine/impl/TransportImplTest.java | 222 +++++++++++++++++++++
 5 files changed, 255 insertions(+), 2 deletions(-)

diff --git 
a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/DeliveryImpl.java 
b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/DeliveryImpl.java
index 2928d376..432e8aa8 100644
--- 
a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/DeliveryImpl.java
+++ 
b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/DeliveryImpl.java
@@ -291,6 +291,12 @@ public class DeliveryImpl implements Delivery
 
     void addToTransportWorkList()
     {
+        TransportSession transportSession = 
getLink().getSession().getTransportSession();
+        if (transportSession != null && transportSession.endSent()) {
+            // Too late to action this work, dont add it to the transport work 
list.
+            return;
+        }
+
         getLink().getConnectionImpl().addTransportWork(this);
     }
 
diff --git 
a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java 
b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java
index aa304965..6653bd79 100644
--- 
a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java
+++ 
b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java
@@ -561,12 +561,18 @@ public class TransportImpl extends EndpointImpl
         SessionImpl session = snd.getSession();
         TransportSession tpSession = session.getTransportSession();
 
+        if (tpSession.endSent()) {
+            // Too late to action this work, clear it.
+            return true;
+        }
+
+        boolean localChannelSet = tpSession.isLocalChannelSet();
         boolean wasDone = delivery.isDone();
 
         if(!delivery.isDone() &&
            (delivery.getDataLength() > 0 || delivery != snd.current()) &&
            tpSession.hasOutgoingCredit() && tpLink.hasCredit() &&
-           tpSession.isLocalChannelSet() &&
+           localChannelSet &&
            tpLink.getLocalHandle() != null && !_frameWriter.isFull())
         {
             DeliveryImpl inProgress = tpLink.getInProgressDelivery();
@@ -676,7 +682,7 @@ public class TransportImpl extends EndpointImpl
             }
         }
 
-        if(wasDone && delivery.getLocalState() != null)
+        if(wasDone && delivery.getLocalState() != null && localChannelSet)
         {
             TransportDelivery tpDelivery = delivery.getTransportDelivery();
             // Use cached object as holder of data for immediate write to the 
FrameWriter
@@ -703,6 +709,11 @@ public class TransportImpl extends EndpointImpl
         SessionImpl session = rcv.getSession();
         TransportSession tpSession = session.getTransportSession();
 
+        if (tpSession.endSent()) {
+            // Too late to action this work, clear it.
+            return true;
+        }
+
         if (tpSession.isLocalChannelSet())
         {
             boolean settled = delivery.isSettled();
@@ -1055,6 +1066,7 @@ public class TransportImpl extends EndpointImpl
                         }
 
                         writeFrame(channel, end, null, null);
+                        transportSession.sentEnd();
                     }
 
                     endpoint.clearModified();
diff --git 
a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportSession.java
 
b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportSession.java
index 0bab2912..d657ba95 100644
--- 
a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportSession.java
+++ 
b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportSession.java
@@ -70,6 +70,7 @@ class TransportSession
     private int _unsettledIncomingSize;
     private boolean _endReceived;
     private boolean _beginSent;
+    private boolean _endSent;
 
     TransportSession(TransportImpl transport, SessionImpl session)
     {
@@ -527,4 +528,14 @@ class TransportSession
     {
         _beginSent = true;
     }
+
+    public boolean endSent()
+    {
+        return _endSent;
+    }
+
+    public void sentEnd()
+    {
+        _endSent = true;
+    }
 }
diff --git 
a/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/DeliveryImplTest.java
 
b/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/DeliveryImplTest.java
index cd390ae1..18143af8 100644
--- 
a/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/DeliveryImplTest.java
+++ 
b/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/DeliveryImplTest.java
@@ -735,7 +735,9 @@ public class DeliveryImplTest
     private DeliveryImpl createSenderDelivery() {
         LinkImpl link = Mockito.mock(SenderImpl.class);
         ConnectionImpl connection = Mockito.mock(ConnectionImpl.class);
+        SessionImpl session = Mockito.mock(SessionImpl.class);
 
+        Mockito.when(link.getSession()).thenReturn(session);
         Mockito.when(link.getConnectionImpl()).thenReturn(connection);
 
         return new DeliveryImpl(null, link, null);
diff --git 
a/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/TransportImplTest.java
 
b/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/TransportImplTest.java
index 4d65343b..56e518bf 100644
--- 
a/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/TransportImplTest.java
+++ 
b/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/TransportImplTest.java
@@ -4895,4 +4895,226 @@ public class TransportImplTest
 
         assertNoEvents(collector);
     }
+
+    /**
+     * Verify that the transport work list doesnt retain 
deliveries+link+session on when a session
+     * is closed and freed while there is an active receiver link with 
deliveries still outstanding.
+     */
+    @Test
+    public void 
testTransportWorkListDoesntLeakDeliveriesEtcFromSessionFreedWithActiveReceiverWithOutstandingDeliveries()
+    {
+        MockTransportImpl transport = new MockTransportImpl();
+        ConnectionImpl connection = new ConnectionImpl();
+        transport.bind(connection);
+
+        connection.open();
+
+        Session session = connection.session();
+        session.open();
+
+        String linkName = "myClientReceiver";
+        Receiver receiver = session.receiver(linkName);
+        receiver.flow(5);
+        receiver.open();
+
+        pumpMockTransport(transport);
+
+        assertEquals("Unexpected frames written: " + 
getFrameTypesWritten(transport), 4, transport.writes.size());
+
+        assertTrue("Unexpected frame type", transport.writes.get(0) instanceof 
Open);
+        assertTrue("Unexpected frame type", transport.writes.get(1) instanceof 
Begin);
+        assertTrue("Unexpected frame type", transport.writes.get(2) instanceof 
Attach);
+        assertTrue("Unexpected frame type", transport.writes.get(3) instanceof 
Flow);
+
+        Delivery delivery = receiver.current();
+        assertNull("Should not yet have a delivery", delivery);
+
+        // Send the necessary responses to open/begin/attach as well as a 
transfer
+        transport.handleFrame(new TransportFrame(0, new Open(), null));
+
+        Begin begin = new Begin();
+        begin.setRemoteChannel(UnsignedShort.valueOf((short) 0));
+        begin.setNextOutgoingId(UnsignedInteger.ONE);
+        begin.setIncomingWindow(UnsignedInteger.valueOf(1024));
+        begin.setOutgoingWindow(UnsignedInteger.valueOf(1024));
+        transport.handleFrame(new TransportFrame(0, begin, null));
+
+        Attach attach = new Attach();
+        attach.setHandle(UnsignedInteger.ZERO);
+        attach.setRole(Role.SENDER);
+        attach.setName(linkName);
+        attach.setInitialDeliveryCount(UnsignedInteger.ZERO);
+        transport.handleFrame(new TransportFrame(0, attach, null));
+
+        assertEndpointState(receiver, EndpointState.ACTIVE, 
EndpointState.ACTIVE);
+        assertEndpointState(session, EndpointState.ACTIVE, 
EndpointState.ACTIVE);
+
+        String deliveryTag = "tag1";
+        String messageContent = "content1";
+        handleTransfer(transport, 1, deliveryTag, messageContent);
+
+        assertEquals("Unexpected frames written: " + 
getFrameTypesWritten(transport), 4, transport.writes.size());
+
+        delivery = verifyDelivery(receiver, deliveryTag, messageContent);
+        assertNotNull("Should now have a delivery", delivery);
+
+        delivery.disposition(Accepted.getInstance());
+
+        assertEquals("Expected the delivery to be on the transport work list", 
delivery, connection.getTransportWorkHead());
+
+        pumpMockTransport(transport);
+
+        assertNull("Expected the delivery to cleared from the transport work 
list", connection.getTransportWorkHead());
+
+        assertEquals("Unexpected frames written: " + 
getFrameTypesWritten(transport), 5, transport.writes.size());
+        assertTrue("Unexpected frame type", transport.writes.get(4) instanceof 
Disposition);
+
+        session.close();
+
+        pumpMockTransport(transport);
+
+        assertEquals("Unexpected frames written: " + 
getFrameTypesWritten(transport), 6, transport.writes.size());
+        assertTrue("Unexpected frame type", transport.writes.get(5) instanceof 
End);
+
+        assertNull("Expected the transport work list to be empty", 
connection.getTransportWorkHead());
+        assertEndpointState(session, EndpointState.CLOSED, 
EndpointState.ACTIVE);
+
+        // Send the necessary responses to End
+        End end = new End();
+        begin.setRemoteChannel(UnsignedShort.valueOf((short) 0));
+        begin.setNextOutgoingId(UnsignedInteger.ONE);
+        begin.setIncomingWindow(UnsignedInteger.valueOf(1024));
+        begin.setOutgoingWindow(UnsignedInteger.valueOf(1024));
+        transport.handleFrame(new TransportFrame(0, end, null));
+
+        assertEndpointState(session, EndpointState.CLOSED, 
EndpointState.CLOSED);
+
+        assertNull("Expected the transport work list to be empty", 
connection.getTransportWorkHead());
+
+        session.free();
+
+        pumpMockTransport(transport);
+
+        assertNull("Expected the transport work list to be empty", 
connection.getTransportWorkHead());
+        assertEquals("Unexpected frames written: " + 
getFrameTypesWritten(transport), 6, transport.writes.size());
+
+        connection.close();
+        pumpMockTransport(transport);
+
+        assertEquals("Unexpected frames written: " + 
getFrameTypesWritten(transport), 7, transport.writes.size());
+        assertTrue("Unexpected frame type", transport.writes.get(6) instanceof 
Close);
+    }
+
+    /**
+     * Verify that the transport doesnt retain deliveries+link+session when a 
session is closed
+     * and freed while there is an active sender link with deliveries still 
outstanding.
+     */
+    @Test
+    public void 
testTransportWorkListDoesntLeakDeliveriesEtcFromSessionFreedWithActiveSenderWithOutstandingDeliveries()
+    {
+        MockTransportImpl transport = new MockTransportImpl();
+        ConnectionImpl connection = new ConnectionImpl();
+        transport.bind(connection);
+
+        connection.open();
+
+        Session session = connection.session();
+        session.open();
+
+        String linkName = "myClientSender";
+        Sender sender = session.sender(linkName);
+        sender.open();
+
+        pumpMockTransport(transport);
+
+        assertEquals("Unexpected frames written: " + 
getFrameTypesWritten(transport), 3, transport.writes.size());
+
+        assertTrue("Unexpected frame type", transport.writes.get(0) instanceof 
Open);
+        assertTrue("Unexpected frame type", transport.writes.get(1) instanceof 
Begin);
+        assertTrue("Unexpected frame type", transport.writes.get(2) instanceof 
Attach);
+
+        assertNull("Should not yet have a delivery", sender.current());
+
+        // Send the necessary responses to open/begin/attach as well as a 
transfer
+        transport.handleFrame(new TransportFrame(0, new Open(), null));
+
+        Begin begin = new Begin();
+        begin.setRemoteChannel(UnsignedShort.valueOf((short) 0));
+        begin.setNextOutgoingId(UnsignedInteger.ONE);
+        begin.setIncomingWindow(UnsignedInteger.valueOf(1024));
+        begin.setOutgoingWindow(UnsignedInteger.valueOf(1024));
+        transport.handleFrame(new TransportFrame(0, begin, null));
+
+        // Give the necessary response to attach for sender and grant some 
credit
+        Attach attach = new Attach();
+        attach.setHandle(UnsignedInteger.ZERO);
+        attach.setRole(Role.RECEIVER);
+        attach.setName(linkName);
+        attach.setInitialDeliveryCount(UnsignedInteger.ZERO);
+        transport.handleFrame(new TransportFrame(0, attach, null));
+
+        int credit = 10;
+        Flow flow = new Flow();
+        flow.setHandle(UnsignedInteger.ZERO);
+        flow.setDeliveryCount(UnsignedInteger.ZERO);
+        flow.setNextIncomingId(UnsignedInteger.ONE);
+        flow.setNextOutgoingId(UnsignedInteger.ZERO);
+        flow.setIncomingWindow(UnsignedInteger.valueOf(1024));
+        flow.setOutgoingWindow(UnsignedInteger.valueOf(1024));
+        flow.setLinkCredit(UnsignedInteger.valueOf(credit));
+
+        transport.handleFrame(new TransportFrame(0, flow, null));
+
+        assertEndpointState(sender, EndpointState.ACTIVE, 
EndpointState.ACTIVE);
+        assertEndpointState(session, EndpointState.ACTIVE, 
EndpointState.ACTIVE);
+
+        assertEquals("Expected the sender to have credit", credit, 
sender.getCredit());
+
+        Delivery delivery = sendMessage(sender, "tag1", "content1");
+
+        assertEquals("Expected the delivery to be on the transport work list", 
delivery, connection.getTransportWorkHead());
+        assertEquals("Expected the sender to have 1 less credit", credit -1 , 
sender.getCredit());
+
+        pumpMockTransport(transport);
+
+        assertEquals("Unexpected frames written: " + 
getFrameTypesWritten(transport), 4, transport.writes.size());
+        assertTrue("Unexpected frame type", transport.writes.get(3) instanceof 
Transfer);
+
+        assertNull("Expected the delivery to cleared from the transport work 
list", connection.getTransportWorkHead());
+
+        session.close();
+
+        pumpMockTransport(transport);
+
+        assertEquals("Unexpected frames written: " + 
getFrameTypesWritten(transport), 5, transport.writes.size());
+        assertTrue("Unexpected frame type", transport.writes.get(4) instanceof 
End);
+
+        assertNull("Expected the transport work list to be empty", 
connection.getTransportWorkHead());
+        assertEndpointState(session, EndpointState.CLOSED, 
EndpointState.ACTIVE);
+
+        // Send the necessary responses to End
+        End end = new End();
+        begin.setRemoteChannel(UnsignedShort.valueOf((short) 0));
+        begin.setNextOutgoingId(UnsignedInteger.ONE);
+        begin.setIncomingWindow(UnsignedInteger.valueOf(1024));
+        begin.setOutgoingWindow(UnsignedInteger.valueOf(1024));
+        transport.handleFrame(new TransportFrame(0, end, null));
+
+        assertEndpointState(session, EndpointState.CLOSED, 
EndpointState.CLOSED);
+
+        assertNull("Expected the transport work list to be empty", 
connection.getTransportWorkHead());
+
+        session.free();
+
+        pumpMockTransport(transport);
+
+        assertNull("Expected the transport work list to be empty", 
connection.getTransportWorkHead());
+        assertEquals("Unexpected frames written: " + 
getFrameTypesWritten(transport), 5, transport.writes.size());
+
+        connection.close();
+        pumpMockTransport(transport);
+
+        assertEquals("Unexpected frames written: " + 
getFrameTypesWritten(transport), 6, transport.writes.size());
+        assertTrue("Unexpected frame type", transport.writes.get(5) instanceof 
Close);
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org
For additional commands, e-mail: commits-h...@qpid.apache.org

Reply via email to