This is an automated email from the ASF dual-hosted git repository.
robbie pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/main by this push:
new a6c7710276 ARTEMIS-5687 Close the AMQP session if the server session
closes
a6c7710276 is described below
commit a6c77102760f3b4d34fd8fa7517133937fed51e6
Author: Timothy Bish <[email protected]>
AuthorDate: Wed Oct 1 08:04:36 2025 -0400
ARTEMIS-5687 Close the AMQP session if the server session closes
If the server session tied to the AMQP session is closed via management
operation the server should close the proton session to cause an End
performative to be sent the the connected peer.
---
.../protocol/amqp/proton/AMQPSessionContext.java | 14 +++++
.../tests/integration/amqp/AmqpSessionTest.java | 64 +++++++++++++++++++++-
2 files changed, 77 insertions(+), 1 deletion(-)
diff --git
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPSessionContext.java
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPSessionContext.java
index 6cf201caef..e736208c98 100644
---
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPSessionContext.java
+++
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPSessionContext.java
@@ -181,6 +181,20 @@ public class AMQPSessionContext extends
ProtonInitializable {
logger.warn(e.getMessage(), e);
}
closed = true;
+
+ connection.runNow(() -> {
+ // Only close proactively if the connection and session are active to
avoid introducing
+ // changes in behaviors around normal remote close and some other
local close pathways
+ // that can trip over themselves if this closes a session on a closed
connection before
+ // a flush of the engine.
+ if (session.getConnection().getLocalState() == EndpointState.ACTIVE
&& session.getLocalState() == EndpointState.ACTIVE) {
+ try {
+ session.close();
+ } finally {
+ connection.flush();
+ }
+ }
+ });
}
public void removeReceiver(Receiver receiver) {
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSessionTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSessionTest.java
index 6e5eba8c10..54b17db764 100644
---
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSessionTest.java
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSessionTest.java
@@ -21,6 +21,7 @@ import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import java.lang.invoke.MethodHandles;
+import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.core.server.ServerSession;
import org.apache.activemq.artemis.core.server.impl.ServerSessionImpl;
@@ -32,12 +33,14 @@ import
org.apache.activemq.transport.amqp.client.AmqpSession;
import org.apache.activemq.transport.amqp.client.AmqpValidator;
import org.apache.qpid.proton.engine.Receiver;
import org.apache.qpid.proton.engine.Session;
+import org.apache.qpid.protonj2.test.driver.ProtonTestClient;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class AmqpSessionTest extends AmqpClientTestSupport {
+
private static final Logger logger =
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
@Test
@@ -87,7 +90,6 @@ public class AmqpSessionTest extends AmqpClientTestSupport {
connection.close();
}
-
@Test
@Timeout(60)
public void testCreateSessionProducerConsumerDoesNotLeakClosable() throws
Exception {
@@ -111,4 +113,64 @@ public class AmqpSessionTest extends AmqpClientTestSupport
{
connection.close();
}
+ @Test
+ public void testSessionClosedOnServerEndsClientSession() throws Exception {
+ doTestSessionClosedOnServerEndsClientSession(false, false);
+ }
+
+ @Test
+ public void testSessionClosedOnServerEndsClientSessionWithFailed() throws
Exception {
+ doTestSessionClosedOnServerEndsClientSession(true, false);
+ }
+
+ @Test
+ public void testSessionClosedOnServerEndsClientSessionWithFailedAndForced()
throws Exception {
+ doTestSessionClosedOnServerEndsClientSession(true, true);
+ }
+
+ @Test
+ public void testSessionClosedOnServerEndsClientSessionForced() throws
Exception {
+ doTestSessionClosedOnServerEndsClientSession(false, true);
+ }
+
+ public void doTestSessionClosedOnServerEndsClientSession(boolean failed,
boolean forced) throws Exception {
+ try (ProtonTestClient peer = new ProtonTestClient()) {
+ peer.queueClientSaslAnonymousConnect();
+ peer.connect("localhost", AMQP_PORT);
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+
+ peer.expectOpen();
+ peer.expectBegin();
+ peer.expectAttach().ofReceiver();
+ peer.expectFlow().withLinkCredit(1000);
+
+ peer.remoteOpen().withContainerId("test-client").now();
+ peer.remoteBegin().now();
+ peer.remoteAttach().ofSender()
+ .withInitialDeliveryCount(0)
+ .withName("client-link")
+ .withSource().withAddress(getTestName())
+ .withCapabilities("queue").also()
+ .withTarget().also()
+ .now();
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+ peer.expectEnd().respond(); // Server signals client of session close.
+
+ assertEquals(1, server.getSessions().size());
+
+ final ServerSession session = server.getSessions().iterator().next();
+
+ assertNotNull(session);
+
+ session.close(failed, forced); // Should trigger End frame.
+
+ assertEquals(0, server.getSessions().size());
+
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+ peer.expectClose();
+ peer.remoteClose().now();
+
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact