This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git

commit 451d03fd75cbb8d49445851beab5b9ef40f6de70
Author: Domenico Francesco Bruscino <[email protected]>
AuthorDate: Sat Jun 24 09:43:41 2023 +0200

    ARTEMIS-4332 Add management method to close stuck server sessions
    
    In rare cases a store operation could silently fails or starves, blocking 
the
    related server session and all delivering messages. Those server sessions 
can
    be closed adding a management method that cleans their operation context
    before closing them.
---
 .../api/core/management/ActiveMQServerControl.java |  8 ++++
 .../management/impl/ActiveMQServerControlImpl.java |  7 ++-
 .../artemis/core/persistence/OperationContext.java |  4 ++
 .../impl/journal/OperationContextImpl.java         | 23 +++++++++
 .../artemis/core/server/ServerSession.java         |  2 +
 .../core/server/impl/ServerSessionImpl.java        |  9 ++++
 .../management/ActiveMQServerControlTest.java      | 54 ++++++++++++++++++++++
 .../ActiveMQServerControlUsingCoreTest.java        |  5 ++
 8 files changed, 111 insertions(+), 1 deletion(-)

diff --git 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ActiveMQServerControl.java
 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ActiveMQServerControl.java
index 0c4289fb52..95ef0c0db2 100644
--- 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ActiveMQServerControl.java
+++ 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ActiveMQServerControl.java
@@ -1240,6 +1240,14 @@ public interface ActiveMQServerControl {
    boolean closeSessionWithID(@Parameter(desc = "The connection ID", name = 
"connectionID") String connectionID,
                               @Parameter(desc = "The session ID", name = "ID") 
String ID) throws Exception;
 
+   /**
+    * Closes the session with the given id.
+    */
+   @Operation(desc = "Closes the session with the id", impact = 
MBeanOperationInfo.INFO)
+   boolean closeSessionWithID(@Parameter(desc = "The connection ID", name = 
"connectionID") String connectionID,
+      @Parameter(desc = "The session ID", name = "ID") String ID,
+      @Parameter(desc = "Force session close cancelling pending tasks", name = 
"force") boolean force) throws Exception;
+
    /**
     * Closes the consumer with the given id.
     */
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java
index 1096ebee4f..bec69e4398 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java
@@ -2381,6 +2381,11 @@ public class ActiveMQServerControlImpl extends 
AbstractControl implements Active
 
    @Override
    public boolean closeSessionWithID(final String connectionID, final String 
ID) throws Exception {
+      return closeSessionWithID(connectionID, ID, false);
+   }
+
+   @Override
+   public boolean closeSessionWithID(final String connectionID, final String 
ID, final boolean force) throws Exception {
       // possibly a long running task
       try (AutoCloseable lock = server.managementLock()) {
          if (AuditLogger.isBaseLoggingEnabled()) {
@@ -2393,7 +2398,7 @@ public class ActiveMQServerControlImpl extends 
AbstractControl implements Active
             List<ServerSession> sessions = server.getSessions(connectionID);
             for (ServerSession session : sessions) {
                if (session.getName().equals(ID.toString())) {
-                  session.close(true);
+                  session.close(true, force);
                   return true;
                }
             }
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/OperationContext.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/OperationContext.java
index 104a79c257..57ba7eca7a 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/OperationContext.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/OperationContext.java
@@ -61,4 +61,8 @@ public interface OperationContext extends IOCompletion {
     * @throws Exception
     */
    boolean waitCompletion(long timeout) throws Exception;
+
+   default void clear() {
+
+   }
 }
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/OperationContextImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/OperationContextImpl.java
index 0a17c8d2d8..ceab9c0c1f 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/OperationContextImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/OperationContextImpl.java
@@ -447,4 +447,27 @@ public class OperationContextImpl implements 
OperationContext {
          executorsPendingField +
          "]";
    }
+
+   @Override
+   public synchronized void clear() {
+      stored = 0;
+      storeLineUpField = 0;
+      minimalReplicated = 0;
+      replicated = 0;
+      replicationLineUpField = 0;
+      paged = 0;
+      minimalPage = 0;
+      pageLineUpField = 0;
+      errorCode = -1;
+      errorMessage = null;
+      executorsPendingField = 0;
+
+      if (tasks != null) {
+         tasks.clear();
+      }
+
+      if (storeOnlyTasks != null) {
+         storeOnlyTasks.clear();
+      }
+   }
 }
\ No newline at end of file
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java
index d02851ee8f..9357138b1f 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java
@@ -372,6 +372,8 @@ public interface ServerSession extends SecurityAuth {
 
    void close(boolean failed) throws Exception;
 
+   void close(boolean failed, boolean force) throws Exception;
+
    void setTransferring(boolean transferring);
 
    Set<ServerConsumer> getServerConsumers();
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
index 3284654049..0833fd81ba 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
@@ -1707,9 +1707,18 @@ public class ServerSessionImpl implements ServerSession, 
FailureListener {
 
    @Override
    public void close(final boolean failed) {
+      close(failed, false);
+   }
+
+   @Override
+   public void close(final boolean failed, final boolean force) {
       if (closed)
          return;
 
+      if (force) {
+         context.clear();
+      }
+
       context.executeOnCompletion(new IOCallback() {
          @Override
          public void onError(int errorCode, String errorMessage) {
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlTest.java
index b6585dbcc4..9f2f4be22d 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlTest.java
@@ -5805,6 +5805,60 @@ public class ActiveMQServerControlTest extends 
ManagementTestBase {
       
Assert.assertTrue(((org.apache.activemq.artemis.jms.client.ActiveMQMessageConsumer)JMSclient).isClosed());
    }
 
+   @Test
+   public void testForceCloseSession() throws Exception {
+      testForceCloseSession(false, false);
+   }
+
+   @Test
+   public void testForceCloseSessionWithError() throws Exception {
+      testForceCloseSession(true, false);
+   }
+
+   @Test
+   public void testForceCloseSessionWithPendingStoreOperation() throws 
Exception {
+      testForceCloseSession(false, true);
+   }
+
+   private void testForceCloseSession(boolean error, boolean 
pendingStoreOperation) throws Exception {
+      SimpleString address = RandomUtil.randomSimpleString();
+      SimpleString name = RandomUtil.randomSimpleString();
+      boolean durable = true;
+
+      ActiveMQServerControl serverControl = createManagementControl();
+
+      checkNoResource(ObjectNameBuilder.DEFAULT.getQueueObjectName(address, 
name, RoutingType.ANYCAST));
+      serverControl.createAddress(address.toString(), "ANYCAST");
+      if (legacyCreateQueue) {
+         serverControl.createQueue(address.toString(), "ANYCAST", 
name.toString(), null, durable, -1, false, false);
+      } else {
+         serverControl.createQueue(new 
QueueConfiguration(name).setAddress(address).setRoutingType(RoutingType.ANYCAST).setDurable(durable).setAutoCreateAddress(false).toJSON());
+      }
+
+      ServerLocator receiveLocator = 
createInVMNonHALocator().setCallTimeout(500);
+      ClientSessionFactory receiveCsf = createSessionFactory(receiveLocator);
+      ClientSession receiveClientSession = receiveCsf.createSession(true, 
false, false);
+      final ClientConsumer clientConsumer = 
receiveClientSession.createConsumer(name);
+
+      Assert.assertEquals(1, server.getSessions().size());
+
+      ServerSession serverSession = server.getSessions().iterator().next();
+      Assert.assertEquals(((ClientSessionImpl)receiveClientSession).getName(), 
serverSession.getName());
+
+      if (error) {
+         serverSession.getSessionContext().onError(0, "error");
+      }
+
+      if (pendingStoreOperation) {
+         serverSession.getSessionContext().storeLineUp();
+      }
+
+      
serverControl.closeSessionWithID(serverSession.getConnectionID().toString(), 
serverSession.getName(), true);
+
+      Wait.assertTrue(() -> serverSession.getServerConsumers().size() == 0, 
500);
+      Wait.assertTrue(() -> server.getSessions().size() == 0, 500);
+   }
+
    @Test
    public void testAddUser() throws Exception {
       ActiveMQServerControl serverControl = createManagementControl();
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlUsingCoreTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlUsingCoreTest.java
index fd1fee8be2..8ae3599eaa 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlUsingCoreTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlUsingCoreTest.java
@@ -159,6 +159,11 @@ public class ActiveMQServerControlUsingCoreTest extends 
ActiveMQServerControlTes
             return (Boolean) proxy.invokeOperation("closeSessionWithID", 
connectionID, ID);
          }
 
+         @Override
+         public boolean closeSessionWithID(String connectionID, String ID, 
boolean force) throws Exception {
+            return (Boolean) proxy.invokeOperation("closeSessionWithID", 
connectionID, ID, force);
+         }
+
          @Override
          public boolean closeConsumerWithID(String sessionID, String ID) 
throws Exception {
             return (Boolean) proxy.invokeOperation("closeConsumerWithID", 
sessionID, ID);

Reply via email to