This is an automated email from the ASF dual-hosted git repository.

brusdev pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/main by this push:
     new c9d9ace4c6 ARTEMIS-5694 CriticalCanalyzer to detect session.close 
timing out.
c9d9ace4c6 is described below

commit c9d9ace4c6c6b654f01c75519e2e524389009e75
Author: Clebert Suconic <[email protected]>
AuthorDate: Fri Oct 3 12:16:31 2025 -0400

    ARTEMIS-5694 CriticalCanalyzer to detect session.close timing out.
    
    Also fixing a few cases where a context would hang
---
 .../utils/critical/CriticalComponentImpl.java      |  14 ++
 .../artemis/core/journal/impl/JournalImpl.java     |  29 ++--
 .../artemis/core/persistence/StorageManager.java   |   2 -
 .../journal/AbstractJournalStorageManager.java     |  10 --
 .../impl/nullpm/NullStorageManager.java            |   4 -
 .../core/server/impl/ServerSessionImpl.java        |  85 ++++++++----
 .../core/transaction/impl/TransactionImplTest.java |   5 -
 .../core/journal/impl/JournalImplTestBase.java     |  16 ++-
 .../tests/integration/client/SendAckFailTest.java  |   5 -
 .../integration/journal/NIOJournalCompactTest.java |  43 ++++++
 .../persistence/SessionCloseTimeoutTest.java       | 149 +++++++++++++++++++++
 .../performance/journal/JournalImplTestUnit.java   | 111 +++++++++++++++
 12 files changed, 411 insertions(+), 62 deletions(-)

diff --git 
a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/critical/CriticalComponentImpl.java
 
b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/critical/CriticalComponentImpl.java
index 9daa4763c8..5ec13a4d89 100644
--- 
a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/critical/CriticalComponentImpl.java
+++ 
b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/critical/CriticalComponentImpl.java
@@ -55,6 +55,19 @@ public class CriticalComponentImpl implements 
CriticalComponent {
       }
    }
 
+   protected void enterCritical(int path) {
+      if (analyzer.isMeasuring()) {
+         measures[path].enterCritical();
+      }
+   }
+
+   protected void leaveCritical(int path) {
+      if (analyzer.isMeasuring()) {
+         measures[path].leaveCritical();
+      }
+   }
+
+
    @Override
    public boolean checkExpiration(long timeout, boolean reset) {
       for (int i = 0; i < measures.length; i++) {
@@ -64,4 +77,5 @@ public class CriticalComponentImpl implements 
CriticalComponent {
       }
       return false;
    }
+
 }
diff --git 
a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java
 
b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java
index ed57e385e2..1d67bce88c 100644
--- 
a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java
+++ 
b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java
@@ -919,7 +919,6 @@ public class JournalImpl extends JournalBase implements 
TestableJournal, Journal
                                final boolean sync,
                                final IOCompletion callback) throws Exception {
       checkJournalIsLoaded();
-      lineUpContext(callback);
 
       if (logger.isTraceEnabled()) {
          logger.trace("scheduling appendAddRecord::id={}, userRecordType={}, 
record = {}", id, recordType, record);
@@ -930,6 +929,8 @@ public class JournalImpl extends JournalBase implements 
TestableJournal, Journal
 
       checkRecordSize(addRecordEncodeSize, record);
 
+      lineUpContext(callback);
+
       final SimpleFuture<Boolean> result = newSyncAndCallbackResult(sync, 
callback);
       appendExecutor.execute(() -> {
          journalLock.readLock().lock();
@@ -966,7 +967,6 @@ public class JournalImpl extends JournalBase implements 
TestableJournal, Journal
                               final boolean sync,
                               final IOCompletion callback) throws Exception {
       checkJournalIsLoaded();
-      lineUpContext(callback);
 
       if (logger.isTraceEnabled()) {
          logger.trace("scheduling appendAddEvent::id={}, userRecordType={}, 
record = {}", id, recordType, record);
@@ -976,6 +976,8 @@ public class JournalImpl extends JournalBase implements 
TestableJournal, Journal
 
       checkRecordSize(addRecord.getEncodeSize(), record);
 
+      lineUpContext(callback);
+
       final SimpleFuture<Boolean> result = newSyncAndCallbackResult(sync, 
callback);
       appendExecutor.execute(() -> {
          journalLock.readLock().lock();
@@ -1022,7 +1024,6 @@ public class JournalImpl extends JournalBase implements 
TestableJournal, Journal
                                   final boolean sync,
                                   final IOCompletion callback) throws 
Exception {
       checkJournalIsLoaded();
-      lineUpContext(callback);
 
       if (logger.isTraceEnabled()) {
          logger.trace("scheduling appendUpdateRecord::id={}, 
userRecordType={}", id, recordType);
@@ -1036,6 +1037,8 @@ public class JournalImpl extends JournalBase implements 
TestableJournal, Journal
          onFoundAddInfo = new SimpleFutureImpl<>();
       }
 
+      lineUpContext(callback);
+
       if (onFoundAddInfo == null) {
          internalAppendUpdateRecord(id, recordType, persister, record, false, 
false, null, callback);
       } else {
@@ -1142,7 +1145,6 @@ public class JournalImpl extends JournalBase implements 
TestableJournal, Journal
       }
 
       checkJournalIsLoaded();
-      lineUpContext(callback);
 
       final SimpleFuture<Boolean> onFoundAddInfo;
 
@@ -1152,6 +1154,8 @@ public class JournalImpl extends JournalBase implements 
TestableJournal, Journal
          onFoundAddInfo = new SimpleFutureImpl<>();
       }
 
+      lineUpContext(callback);
+
       if (onFoundAddInfo == null) {
          internalAppendDeleteRecord(id, false, null, callback);
       } else {
@@ -1448,13 +1452,13 @@ public class JournalImpl extends JournalBase implements 
TestableJournal, Journal
       }
    }
 
-   private void setErrorCondition(IOCallback otherCallback, JournalTransaction 
jt, Throwable t) {
+   private void setErrorCondition(IOCallback ioCallback, JournalTransaction 
jt, Throwable t) {
       if (jt != null) {
          jt.onError(ActiveMQExceptionType.IO_ERROR.getCode(), t.getMessage());
       }
 
-      if (otherCallback != null) {
-         otherCallback.onError(ActiveMQExceptionType.IO_ERROR.getCode(), 
t.getMessage());
+      if (ioCallback != null) {
+         ioCallback.onError(ActiveMQExceptionType.IO_ERROR.getCode(), 
t.getMessage());
       }
    }
 
@@ -1467,9 +1471,6 @@ public class JournalImpl extends JournalBase implements 
TestableJournal, Journal
                                   final IOCompletion callback,
                                   final boolean lineUpContext) throws 
Exception {
       checkJournalIsLoaded();
-      if (lineUpContext) {
-         lineUpContext(callback);
-      }
 
       if (logger.isTraceEnabled()) {
          logger.trace("scheduling appendCommitRecord::txID={}", txID);
@@ -1480,6 +1481,9 @@ public class JournalImpl extends JournalBase implements 
TestableJournal, Journal
          txcheck.checkErrorCondition();
       }
 
+      if (lineUpContext) {
+         lineUpContext(callback);
+      }
 
       final SimpleFuture<JournalTransaction> result = 
newSyncAndCallbackResult(sync, callback);
 
@@ -3523,4 +3527,9 @@ public class JournalImpl extends JournalBase implements 
TestableJournal, Journal
    public int getCompactCount() {
       return compactCount;
    }
+
+   public void markTXError(long txID, Throwable t) {
+      JournalTransaction tx = transactions.get(txID);
+      tx.onError(-1, t.getMessage());
+   }
 }
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/StorageManager.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/StorageManager.java
index a4a11359e0..9ff3f8ea38 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/StorageManager.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/StorageManager.java
@@ -117,8 +117,6 @@ public interface StorageManager extends MapStorageManager, 
IDGenerator, ActiveMQ
     */
    OperationContext getContext();
 
-   void lineUpContext();
-
    /**
     * It just creates an OperationContext without associating it
     */
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
index c3fc7653ae..4f6fbe0e19 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
@@ -1763,16 +1763,6 @@ public abstract class AbstractJournalStorageManager 
extends CriticalComponentImp
       persistedKeyValuePairs.put(keyValuePair.getKey(), keyValuePair);
    }
 
-   @Override
-   public void lineUpContext() {
-      try (ArtemisCloseable lock = closeableReadLock()) {
-         messageJournal.lineUpContext(getContext());
-      }
-   }
-
-   // ActiveMQComponent implementation
-   // ------------------------------------------------------
-
    protected abstract void beforeStart() throws Exception;
 
    @Override
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageManager.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageManager.java
index 89f084685b..47daeae68b 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageManager.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageManager.java
@@ -647,10 +647,6 @@ public class NullStorageManager implements StorageManager {
    public void commit(final long txID, final boolean lineUpContext) throws 
Exception {
    }
 
-   @Override
-   public void lineUpContext() {
-   }
-
    @Override
    public void deletePendingLargeMessage(final long recordID) throws Exception 
{
    }
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
index 4c112e5990..1808ba85e2 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
@@ -108,6 +108,7 @@ import org.apache.activemq.artemis.utils.CompositeAddress;
 import org.apache.activemq.artemis.utils.JsonLoader;
 import org.apache.activemq.artemis.utils.PrefixUtil;
 import org.apache.activemq.artemis.utils.collections.TypedProperties;
+import org.apache.activemq.artemis.utils.critical.CriticalComponentImpl;
 import org.apache.activemq.artemis.utils.runnables.AtomicRunnable;
 import org.apache.activemq.artemis.utils.runnables.RunnableList;
 import org.slf4j.Logger;
@@ -116,10 +117,12 @@ import org.slf4j.LoggerFactory;
 /**
  * Server side Session implementation
  */
-public class ServerSessionImpl implements ServerSession, FailureListener {
+public class ServerSessionImpl extends CriticalComponentImpl implements 
ServerSession, FailureListener {
 
    private static final Logger logger = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
 
+   private static final int CRITICAL_PATH_CLOSE = 0;
+
    private boolean securityEnabled = true;
 
    private final String securityDomain;
@@ -202,6 +205,9 @@ public class ServerSessionImpl implements ServerSession, 
FailureListener {
    // server. Both the request and failure listener will
    // try to close one session from different threads
    // concurrently.
+   private volatile boolean closing = false;
+
+   // When the doClose is called, we will make it actually closed
    private volatile boolean closed = false;
 
    private boolean prefixEnabled = false;
@@ -237,6 +243,8 @@ public class ServerSessionImpl implements ServerSession, 
FailureListener {
                             final Map<SimpleString, RoutingType> prefixes,
                             final String securityDomain,
                             boolean isLegacyProducer) throws Exception {
+      super(server.getCriticalAnalyzer(), 1);
+
       this.username = username;
 
       this.password = password;
@@ -339,7 +347,7 @@ public class ServerSessionImpl implements ServerSession, 
FailureListener {
 
    @Override
    public boolean isClosed() {
-      return closed;
+      return closing;
    }
 
    @Override
@@ -404,6 +412,10 @@ public class ServerSessionImpl implements ServerSession, 
FailureListener {
          callback.close(failed);
       }
       synchronized (this) {
+         if (closed) {
+            return;
+         }
+         closed = true;
          if (server.hasBrokerSessionPlugins()) {
             server.callBrokerSessionPlugins(plugin -> 
plugin.beforeCloseSession(this, failed));
          }
@@ -609,7 +621,7 @@ public class ServerSessionImpl implements ServerSession, 
FailureListener {
 
       ServerConsumer consumer;
       synchronized (this) {
-         if (closed) {
+         if (closing) {
             throw 
ActiveMQMessageBundle.BUNDLE.cannotCreateConsumerOnClosedSession(queueName);
          }
          consumer = new ServerConsumerImpl(consumerID, this, (QueueBinding) 
binding, filter, priority, started, browseOnly, storageManager, callback, 
preAcknowledge, strictUpdateDeliveryCount, managementService, 
supportLargeMessage, credits, server);
@@ -1777,37 +1789,47 @@ public class ServerSessionImpl implements 
ServerSession, FailureListener {
    @Override
    public void close(final boolean failed, final boolean force) {
       synchronized (this) {
-         if (closed) {
+         if (closing) {
             return;
          }
-         closed = true;
+         closing = true;
       }
 
       if (force) {
          context.reset();
       }
 
+      // We only add the session as component on the critical analyzer
+      // while the close is happening between the user's thread and the 
context's thread.
+      // Once finishClose is called to complete the operation, leaveCritical 
is called
+      // and the session is removed from the component's list on the critical 
analyzer.
+      enterCritical(CRITICAL_PATH_CLOSE);
+      getCriticalAnalyzer().add(this);
+
       context.executeOnCompletion(new IOCallback() {
          @Override
          public void onError(int errorCode, String errorMessage) {
-            callDoClose();
+            finishClose(failed);
          }
 
          @Override
          public void done() {
-            callDoClose();
-         }
-
-         private void callDoClose() {
-            try {
-               doClose(failed);
-            } catch (Exception e) {
-               ActiveMQServerLogger.LOGGER.errorClosingSession(e);
-            }
+            finishClose(failed);
          }
       });
    }
 
+   private void finishClose(boolean failed) {
+      try {
+         doClose(failed);
+      } catch (Exception e) {
+         ActiveMQServerLogger.LOGGER.errorClosingSession(e);
+      } finally {
+         leaveCritical(CRITICAL_PATH_CLOSE);
+         getCriticalAnalyzer().remove(this);
+      }
+   }
+
    @Override
    public void closeConsumer(final long consumerID) throws Exception {
       final ServerConsumer consumer = locateConsumer(consumerID);
@@ -2192,6 +2214,30 @@ public class ServerSessionImpl implements ServerSession, 
FailureListener {
    @Override
    public String toString() {
       StringBuilder sb = new StringBuilder();
+      try {
+         long txID;
+         if (tx != null) {
+            txID = tx.getID();
+         } else {
+            txID = -1L;
+         }
+         sb.append("name=").append(name).append(",");
+         sb.append("consumers=").append(consumers.size()).append(",");
+         sb.append("txID=").append(txID).append(",");
+         sb.append("remotingConnection=").append(remotingConnection);
+      } catch (Throwable justLogit) {
+         logger.debug(justLogit.getMessage(), justLogit);
+      }
+
+      insertMetadata(sb);
+
+      // This will actually appear on some management operations
+      // so please don't clog this with debug objects
+      // unless you provide a special way for management to translate sessions
+      return "ServerSessionImpl(" + sb + ")";
+   }
+
+   private void insertMetadata(StringBuilder sb) {
       if (this.metaData != null) {
          for (Map.Entry<String, String> value : metaData.entrySet()) {
             if (!sb.isEmpty()) {
@@ -2205,22 +2251,15 @@ public class ServerSessionImpl implements 
ServerSession, FailureListener {
             }
          }
       }
-      // This will actually appear on some management operations
-      // so please don't clog this with debug objects
-      // unless you provide a special way for management to translate sessions
-      return "ServerSessionImpl(" + sb.toString() + ")";
    }
 
-   // FailureListener implementation
-   // --------------------------------------------------------------------
-
    @Override
    public void connectionFailed(final ActiveMQException me, boolean 
failedOver) {
       /*
        * This can be invoked from Netty (via channelInactive) when the 
connection has already been closed causing
        * spurious logging about clearing up resources for failed client 
connections.
        */
-      if (closed)
+      if (closing)
          return;
 
       try {
diff --git 
a/artemis-server/src/test/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImplTest.java
 
b/artemis-server/src/test/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImplTest.java
index 0430348967..524f765628 100644
--- 
a/artemis-server/src/test/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImplTest.java
+++ 
b/artemis-server/src/test/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImplTest.java
@@ -332,11 +332,6 @@ public class TransactionImplTest extends ServerTestBase {
          return null;
       }
 
-      @Override
-      public void lineUpContext() {
-
-      }
-
       @Override
       public AbstractPersistedAddressSetting 
recoverAddressSettings(SimpleString address) {
          return null;
diff --git 
a/tests/artemis-test-support/src/main/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/JournalImplTestBase.java
 
b/tests/artemis-test-support/src/main/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/JournalImplTestBase.java
index bc216d8a0b..d9bd173962 100644
--- 
a/tests/artemis-test-support/src/main/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/JournalImplTestBase.java
+++ 
b/tests/artemis-test-support/src/main/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/JournalImplTestBase.java
@@ -46,6 +46,8 @@ import 
org.apache.activemq.artemis.core.journal.TestableJournal;
 import org.apache.activemq.artemis.core.journal.impl.JournalFile;
 import org.apache.activemq.artemis.core.journal.impl.JournalImpl;
 import org.apache.activemq.artemis.core.journal.impl.JournalReaderCallback;
+import 
org.apache.activemq.artemis.core.journal.impl.dataformat.ByteArrayEncoding;
+import org.apache.activemq.artemis.core.persistence.OperationContext;
 import org.apache.activemq.artemis.logs.AssertionLoggerHandler;
 import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
 import org.apache.activemq.artemis.utils.ReusableLatch;
@@ -389,16 +391,20 @@ public abstract class JournalImplTestBase extends 
ActiveMQTestBase {
    }
 
    protected void add(final long... arguments) throws Exception {
-      addWithSize(recordLength, arguments);
+      addWithSize(recordLength, null, arguments);
    }
 
    protected void addWithSize(final int size, final long... arguments) throws 
Exception {
+      addWithSize(size, null, arguments);
+   }
+
+   protected void addWithSize(final int size, OperationContext context, final 
long... arguments) throws Exception {
       for (long element : arguments) {
          byte[] record = generateRecord(size);
 
          beforeJournalOperation();
 
-         journal.appendAddRecord(element, (byte) 0, record, sync);
+         journal.appendAddRecord(element, (byte) 0, new 
ByteArrayEncoding(record), sync, context);
 
          records.add(new RecordInfo(element, (byte) 0, record, false, false, 
(short) 0));
       }
@@ -465,12 +471,16 @@ public abstract class JournalImplTestBase extends 
ActiveMQTestBase {
    }
 
    protected void addTx(final long txID, final long... arguments) throws 
Exception {
+      addTxWithSize(recordLength, txID, arguments);
+   }
+
+   protected void addTxWithSize(final int size, final long txID, final long... 
arguments) throws Exception {
       TransactionHolder tx = getTransaction(txID);
 
       for (long element : arguments) {
          // SIZE_BYTE + SIZE_LONG + SIZE_LONG + SIZE_INT + record.length +
          // SIZE_BYTE
-         byte[] record = generateRecord(recordLength - 
(JournalImpl.SIZE_ADD_RECORD_TX + 1));
+         byte[] record = generateRecord(size - (JournalImpl.SIZE_ADD_RECORD_TX 
+ 1));
 
          beforeJournalOperation();
 
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/SendAckFailTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/SendAckFailTest.java
index 71d17944e3..76e0c02687 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/SendAckFailTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/SendAckFailTest.java
@@ -345,11 +345,6 @@ public class SendAckFailTest extends SpawnedTestBase {
          return manager.getContext();
       }
 
-      @Override
-      public void lineUpContext() {
-         manager.lineUpContext();
-      }
-
       @Override
       public OperationContext newContext(Executor executor) {
          return manager.newContext(executor);
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/NIOJournalCompactTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/NIOJournalCompactTest.java
index a5026dfb5b..3ef6922a17 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/NIOJournalCompactTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/NIOJournalCompactTest.java
@@ -496,6 +496,49 @@ public class NIOJournalCompactTest extends 
JournalImplTestBase {
       loadAndCheck();
    }
 
+   @Test
+   public void testCommitOnRolledBack() throws Exception {
+      ExecutorService executorService = Executors.newSingleThreadExecutor();
+      runAfter(executorService::shutdownNow);
+
+      setup(2, 60 * 1024, false);
+
+      createJournal();
+
+      startJournal();
+
+      load();
+
+      addTx(1, 2);
+      rollback(1);
+      startCompact();
+
+      OperationContextImpl context = new OperationContextImpl(executorService);
+      journal.appendCommitRecord(1, false, context);
+      CountDownLatch latch = new CountDownLatch(1);
+      AtomicInteger error = new AtomicInteger(0);
+      context.executeOnCompletion(new IOCallback() {
+         @Override
+         public void done() {
+            latch.countDown();
+         }
+
+         @Override
+         public void onError(int errorCode, String errorMessage) {
+            error.incrementAndGet();
+            latch.countDown();
+         }
+      });
+
+      assertTrue(latch.await(10, TimeUnit.SECONDS));
+
+      assertEquals(1, error.get());
+
+      finishCompact();
+
+      stopJournal();
+   }
+
    @Test
    public void testCompactPrepareRestart2() throws Exception {
       setup(2, 60 * 1024, false);
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/SessionCloseTimeoutTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/SessionCloseTimeoutTest.java
new file mode 100644
index 0000000000..91ec30cfea
--- /dev/null
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/SessionCloseTimeoutTest.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.persistence;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import java.lang.invoke.MethodHandles;
+import java.util.HashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.activemq.artemis.api.core.ActiveMQException;
+import org.apache.activemq.artemis.api.core.QueueConfiguration;
+import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.api.core.TransportConfiguration;
+import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.jms.client.ActiveMQMessageConsumer;
+import org.apache.activemq.artemis.logs.AssertionLoggerHandler;
+import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
+import org.apache.activemq.artemis.tests.util.CFUtil;
+import org.apache.activemq.artemis.utils.RandomUtil;
+import org.apache.activemq.artemis.utils.Wait;
+import org.apache.activemq.artemis.utils.critical.CriticalAnalyzerPolicy;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * A simple test-case used for documentation purposes.
+ */
+public class SessionCloseTimeoutTest extends ActiveMQTestBase {
+
+   private static final Logger logger = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+   ActiveMQServer server;
+
+   @BeforeEach
+   protected void createServer() throws Exception {
+
+      ConfigurationImpl configuration = 
createBasicConfig(0).setJMXManagementEnabled(false).addAcceptorConfiguration(new
 TransportConfiguration(INVM_ACCEPTOR_FACTORY, generateInVMParams(0), "invm"));
+      
configuration.setCriticalAnalyzer(true).setCriticalAnalyzerPolicy(CriticalAnalyzerPolicy.SHUTDOWN).setCriticalAnalyzerTimeout(1000);
+
+      HashMap<String, Object> extraConfig = new HashMap<>();
+      HashMap<String, Object> regularConfig = new HashMap<>();
+
+      configuration.addAcceptorConfiguration(new 
TransportConfiguration(NETTY_ACCEPTOR_FACTORY, regularConfig, "netty", 
extraConfig));
+      server = createServer(true, configuration);
+      server.start();
+   }
+
+
+   /**
+    * This is simulating a context that will never finish, the timeout should 
take care of it.
+    */
+   @Test
+   public void testSessionCloseTimeout() throws Exception {
+
+      int numberOfMessages = 100;
+
+      String queueName = getName();
+
+      final String tag = RandomUtil.randomAlphaNumericString(20);
+      AtomicInteger frozenSessions = new AtomicInteger(0);
+
+      try (AssertionLoggerHandler loggerHandler = new 
AssertionLoggerHandler()) {
+
+         org.apache.activemq.artemis.core.server.Queue serverQueue = 
server.createQueue(QueueConfiguration.of(queueName).setRoutingType(RoutingType.ANYCAST));
+         ConnectionFactory connectionFactory = 
CFUtil.createConnectionFactory("CORE", "tcp://localhost:61616");
+         try (Connection connection = connectionFactory.createConnection()) {
+            connection.setClientID(tag);
+            connection.start();
+            Session session = connection.createSession(true, 
Session.SESSION_TRANSACTED);
+            Queue queue = session.createQueue(queueName);
+
+            ActiveMQMessageConsumer consumer = (ActiveMQMessageConsumer) 
session.createConsumer(queue);
+
+            MessageProducer producer = session.createProducer(queue);
+            for (int i = 0; i < numberOfMessages; i++) {
+               TextMessage message = session.createTextMessage("hello " + i);
+               message.setIntProperty("i", i);
+               producer.send(message);
+            }
+            session.commit();
+
+            Wait.assertEquals(numberOfMessages, serverQueue::getMessageCount, 
5000);
+
+            Wait.waitFor(() -> serverQueue.getDeliveringCount() > 1);
+            assertNotNull(consumer.receive(5000));
+
+            server.getRemotingService().getConnections().stream().filter(r -> 
String.valueOf(r.getClientID()).equals(tag)).forEach(r -> {
+               server.getSessions().stream().filter(s -> 
s.getRemotingConnection() == r).forEach(s -> {
+                  // this will make the context to never finish
+                  s.getSessionContext().storeLineUp();
+                  frozenSessions.incrementAndGet();
+               });
+               r.fail(new ActiveMQException("fail"));
+               r.getTransportConnection().disconnect();
+            });
+         }
+
+         assertTrue(frozenSessions.get() > 0);
+
+         Wait.assertFalse(server::isStarted);
+
+         assertTrue(loggerHandler.findText("AMQ224107"), "Critical Analyzer 
supposed to happen");
+
+         createServer();
+
+         connectionFactory = CFUtil.createConnectionFactory("CORE", 
"tcp://localhost:61616");
+
+         try (Connection connection = connectionFactory.createConnection()) {
+            connection.start();
+            Session session = connection.createSession(false, 
Session.CLIENT_ACKNOWLEDGE);
+            MessageConsumer consumer = 
session.createConsumer(session.createQueue(queueName));
+            for (int i = 0; i < numberOfMessages; i++) {
+               TextMessage message = (TextMessage) consumer.receive(5000);
+               assertNotNull(message);
+               assertEquals(i, message.getIntProperty("i"));
+            }
+         }
+      }
+   }
+}
\ No newline at end of file
diff --git 
a/tests/performance-tests/src/test/java/org/apache/activemq/artemis/tests/performance/journal/JournalImplTestUnit.java
 
b/tests/performance-tests/src/test/java/org/apache/activemq/artemis/tests/performance/journal/JournalImplTestUnit.java
index c94ffac741..7031497bde 100644
--- 
a/tests/performance-tests/src/test/java/org/apache/activemq/artemis/tests/performance/journal/JournalImplTestUnit.java
+++ 
b/tests/performance-tests/src/test/java/org/apache/activemq/artemis/tests/performance/journal/JournalImplTestUnit.java
@@ -18,18 +18,26 @@ package 
org.apache.activemq.artemis.tests.performance.journal;
 
 import java.lang.invoke.MethodHandles;
 import java.util.ArrayList;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
 
+import org.apache.activemq.artemis.core.journal.IOCompletion;
 import org.apache.activemq.artemis.core.journal.Journal;
 import org.apache.activemq.artemis.core.journal.impl.JournalImpl;
+import 
org.apache.activemq.artemis.core.persistence.impl.journal.OperationContextImpl;
 import org.apache.activemq.artemis.nativo.jlibaio.LibaioContext;
 import 
org.apache.activemq.artemis.tests.unit.core.journal.impl.JournalImplTestBase;
 import 
org.apache.activemq.artemis.tests.unit.core.journal.impl.fakes.SimpleEncoding;
 import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 
 public abstract class JournalImplTestUnit extends JournalImplTestBase {
 
@@ -83,6 +91,109 @@ public abstract class JournalImplTestUnit extends 
JournalImplTestBase {
 
    }
 
+   @Test
+   public void testCommitOnError() throws Exception {
+      ExecutorService executorService = Executors.newSingleThreadExecutor();
+      runAfter(executorService::shutdownNow);
+      setup(10, 10 * 1024 * 1024, true);
+      createJournal();
+      startJournal();
+      load();
+      addTx(1, 1L);
+      ((JournalImpl)journal).markTXError(1, new Exception("test"));
+      OperationContextImpl context = new OperationContextImpl(executorService);
+      Assertions.assertThrows(Exception.class, () -> {
+         journal.appendCommitRecord(1, true, context, true);
+      });
+      CountDownLatch latch  = new CountDownLatch(1);
+      context.executeOnCompletion(new IOCompletion() {
+         @Override
+         public void storeLineUp() {
+
+         }
+
+         @Override
+         public void done() {
+            latch.countDown();
+         }
+
+         @Override
+         public void onError(int errorCode, String errorMessage) {
+
+         }
+      });
+
+      assertTrue(latch.await(5, TimeUnit.SECONDS));
+   }
+
+
+   @Test
+   public void testBiggerRecordTX() throws Exception {
+      ExecutorService executorService = Executors.newSingleThreadExecutor();
+      runAfter(executorService::shutdownNow);
+      setup(10, 10 * 1024 * 1024, true);
+      createJournal();
+      startJournal();
+      load();
+      OperationContextImpl context = new OperationContextImpl(executorService);
+      Assertions.assertThrows(Exception.class, () -> {
+         addTxWithSize(1024 * 1024, 1, 1);
+      });
+      CountDownLatch latch  = new CountDownLatch(1);
+      context.executeOnCompletion(new IOCompletion() {
+         @Override
+         public void storeLineUp() {
+
+         }
+
+         @Override
+         public void done() {
+            latch.countDown();
+         }
+
+         @Override
+         public void onError(int errorCode, String errorMessage) {
+
+         }
+      });
+
+      assertTrue(latch.await(5, TimeUnit.SECONDS));
+   }
+
+
+   @Test
+   public void testBiggerRecord() throws Exception {
+      ExecutorService executorService = Executors.newSingleThreadExecutor();
+      runAfter(executorService::shutdownNow);
+      setup(10, 10 * 1024 * 1024, true);
+      createJournal();
+      startJournal();
+      load();
+      OperationContextImpl context = new OperationContextImpl(executorService);
+      Assertions.assertThrows(Exception.class, () -> {
+         addWithSize(1024 * 1024, context, 1, 1);
+      });
+      CountDownLatch latch  = new CountDownLatch(1);
+      context.executeOnCompletion(new IOCompletion() {
+         @Override
+         public void storeLineUp() {
+
+         }
+
+         @Override
+         public void done() {
+            latch.countDown();
+         }
+
+         @Override
+         public void onError(int errorCode, String errorMessage) {
+            new Exception("errorCode=" + errorCode).printStackTrace();
+         }
+      });
+
+      assertTrue(latch.await(5, TimeUnit.SECONDS));
+   }
+
    @Test
    public void testAddUpdateDeleteManySmallFileSize() throws Exception {
       final int numberAdds = 1000;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact



Reply via email to