This is an automated email from the ASF dual-hosted git repository.

robbie pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/main by this push:
     new a6c7710276 ARTEMIS-5687 Close the AMQP session if the server session 
closes
a6c7710276 is described below

commit a6c77102760f3b4d34fd8fa7517133937fed51e6
Author: Timothy Bish <[email protected]>
AuthorDate: Wed Oct 1 08:04:36 2025 -0400

    ARTEMIS-5687 Close the AMQP session if the server session closes
    
    If the server session tied to the AMQP session is closed via management
    operation the server should close the proton session to cause an End
    performative to be sent the the connected peer.
---
 .../protocol/amqp/proton/AMQPSessionContext.java   | 14 +++++
 .../tests/integration/amqp/AmqpSessionTest.java    | 64 +++++++++++++++++++++-
 2 files changed, 77 insertions(+), 1 deletion(-)

diff --git 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPSessionContext.java
 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPSessionContext.java
index 6cf201caef..e736208c98 100644
--- 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPSessionContext.java
+++ 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPSessionContext.java
@@ -181,6 +181,20 @@ public class AMQPSessionContext extends 
ProtonInitializable {
          logger.warn(e.getMessage(), e);
       }
       closed = true;
+
+      connection.runNow(() -> {
+         // Only close proactively if the connection and session are active to 
avoid introducing
+         // changes in behaviors around normal remote close and some other 
local close pathways
+         // that can trip over themselves if this closes a session on a closed 
connection before
+         // a flush of the engine.
+         if (session.getConnection().getLocalState() == EndpointState.ACTIVE 
&& session.getLocalState() == EndpointState.ACTIVE) {
+            try {
+               session.close();
+            } finally {
+               connection.flush();
+            }
+         }
+      });
    }
 
    public void removeReceiver(Receiver receiver) {
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSessionTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSessionTest.java
index 6e5eba8c10..54b17db764 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSessionTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSessionTest.java
@@ -21,6 +21,7 @@ import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 
 import java.lang.invoke.MethodHandles;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.activemq.artemis.core.server.ServerSession;
 import org.apache.activemq.artemis.core.server.impl.ServerSessionImpl;
@@ -32,12 +33,14 @@ import 
org.apache.activemq.transport.amqp.client.AmqpSession;
 import org.apache.activemq.transport.amqp.client.AmqpValidator;
 import org.apache.qpid.proton.engine.Receiver;
 import org.apache.qpid.proton.engine.Session;
+import org.apache.qpid.protonj2.test.driver.ProtonTestClient;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.Timeout;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class AmqpSessionTest extends AmqpClientTestSupport {
+
    private static final Logger logger = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
 
    @Test
@@ -87,7 +90,6 @@ public class AmqpSessionTest extends AmqpClientTestSupport {
       connection.close();
    }
 
-
    @Test
    @Timeout(60)
    public void testCreateSessionProducerConsumerDoesNotLeakClosable() throws 
Exception {
@@ -111,4 +113,64 @@ public class AmqpSessionTest extends AmqpClientTestSupport 
{
       connection.close();
    }
 
+   @Test
+   public void testSessionClosedOnServerEndsClientSession() throws Exception {
+      doTestSessionClosedOnServerEndsClientSession(false, false);
+   }
+
+   @Test
+   public void testSessionClosedOnServerEndsClientSessionWithFailed() throws 
Exception {
+      doTestSessionClosedOnServerEndsClientSession(true, false);
+   }
+
+   @Test
+   public void testSessionClosedOnServerEndsClientSessionWithFailedAndForced() 
throws Exception {
+      doTestSessionClosedOnServerEndsClientSession(true, true);
+   }
+
+   @Test
+   public void testSessionClosedOnServerEndsClientSessionForced() throws 
Exception {
+      doTestSessionClosedOnServerEndsClientSession(false, true);
+   }
+
+   public void doTestSessionClosedOnServerEndsClientSession(boolean failed, 
boolean forced) throws Exception {
+      try (ProtonTestClient peer = new ProtonTestClient()) {
+         peer.queueClientSaslAnonymousConnect();
+         peer.connect("localhost", AMQP_PORT);
+         peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+
+         peer.expectOpen();
+         peer.expectBegin();
+         peer.expectAttach().ofReceiver();
+         peer.expectFlow().withLinkCredit(1000);
+
+         peer.remoteOpen().withContainerId("test-client").now();
+         peer.remoteBegin().now();
+         peer.remoteAttach().ofSender()
+                            .withInitialDeliveryCount(0)
+                            .withName("client-link")
+                            .withSource().withAddress(getTestName())
+                                         .withCapabilities("queue").also()
+                            .withTarget().also()
+                            .now();
+         peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+         peer.expectEnd().respond(); // Server signals client of session close.
+
+         assertEquals(1, server.getSessions().size());
+
+         final ServerSession session = server.getSessions().iterator().next();
+
+         assertNotNull(session);
+
+         session.close(failed, forced); // Should trigger End frame.
+
+         assertEquals(0, server.getSessions().size());
+
+         peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+         peer.expectClose();
+         peer.remoteClose().now();
+
+         peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+      }
+   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact


Reply via email to