This is an automated email from the ASF dual-hosted git repository.
brusdev pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/main by this push:
new c9d9ace4c6 ARTEMIS-5694 CriticalCanalyzer to detect session.close
timing out.
c9d9ace4c6 is described below
commit c9d9ace4c6c6b654f01c75519e2e524389009e75
Author: Clebert Suconic <[email protected]>
AuthorDate: Fri Oct 3 12:16:31 2025 -0400
ARTEMIS-5694 CriticalCanalyzer to detect session.close timing out.
Also fixing a few cases where a context would hang
---
.../utils/critical/CriticalComponentImpl.java | 14 ++
.../artemis/core/journal/impl/JournalImpl.java | 29 ++--
.../artemis/core/persistence/StorageManager.java | 2 -
.../journal/AbstractJournalStorageManager.java | 10 --
.../impl/nullpm/NullStorageManager.java | 4 -
.../core/server/impl/ServerSessionImpl.java | 85 ++++++++----
.../core/transaction/impl/TransactionImplTest.java | 5 -
.../core/journal/impl/JournalImplTestBase.java | 16 ++-
.../tests/integration/client/SendAckFailTest.java | 5 -
.../integration/journal/NIOJournalCompactTest.java | 43 ++++++
.../persistence/SessionCloseTimeoutTest.java | 149 +++++++++++++++++++++
.../performance/journal/JournalImplTestUnit.java | 111 +++++++++++++++
12 files changed, 411 insertions(+), 62 deletions(-)
diff --git
a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/critical/CriticalComponentImpl.java
b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/critical/CriticalComponentImpl.java
index 9daa4763c8..5ec13a4d89 100644
---
a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/critical/CriticalComponentImpl.java
+++
b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/critical/CriticalComponentImpl.java
@@ -55,6 +55,19 @@ public class CriticalComponentImpl implements
CriticalComponent {
}
}
+ protected void enterCritical(int path) {
+ if (analyzer.isMeasuring()) {
+ measures[path].enterCritical();
+ }
+ }
+
+ protected void leaveCritical(int path) {
+ if (analyzer.isMeasuring()) {
+ measures[path].leaveCritical();
+ }
+ }
+
+
@Override
public boolean checkExpiration(long timeout, boolean reset) {
for (int i = 0; i < measures.length; i++) {
@@ -64,4 +77,5 @@ public class CriticalComponentImpl implements
CriticalComponent {
}
return false;
}
+
}
diff --git
a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java
b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java
index ed57e385e2..1d67bce88c 100644
---
a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java
+++
b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java
@@ -919,7 +919,6 @@ public class JournalImpl extends JournalBase implements
TestableJournal, Journal
final boolean sync,
final IOCompletion callback) throws Exception {
checkJournalIsLoaded();
- lineUpContext(callback);
if (logger.isTraceEnabled()) {
logger.trace("scheduling appendAddRecord::id={}, userRecordType={},
record = {}", id, recordType, record);
@@ -930,6 +929,8 @@ public class JournalImpl extends JournalBase implements
TestableJournal, Journal
checkRecordSize(addRecordEncodeSize, record);
+ lineUpContext(callback);
+
final SimpleFuture<Boolean> result = newSyncAndCallbackResult(sync,
callback);
appendExecutor.execute(() -> {
journalLock.readLock().lock();
@@ -966,7 +967,6 @@ public class JournalImpl extends JournalBase implements
TestableJournal, Journal
final boolean sync,
final IOCompletion callback) throws Exception {
checkJournalIsLoaded();
- lineUpContext(callback);
if (logger.isTraceEnabled()) {
logger.trace("scheduling appendAddEvent::id={}, userRecordType={},
record = {}", id, recordType, record);
@@ -976,6 +976,8 @@ public class JournalImpl extends JournalBase implements
TestableJournal, Journal
checkRecordSize(addRecord.getEncodeSize(), record);
+ lineUpContext(callback);
+
final SimpleFuture<Boolean> result = newSyncAndCallbackResult(sync,
callback);
appendExecutor.execute(() -> {
journalLock.readLock().lock();
@@ -1022,7 +1024,6 @@ public class JournalImpl extends JournalBase implements
TestableJournal, Journal
final boolean sync,
final IOCompletion callback) throws
Exception {
checkJournalIsLoaded();
- lineUpContext(callback);
if (logger.isTraceEnabled()) {
logger.trace("scheduling appendUpdateRecord::id={},
userRecordType={}", id, recordType);
@@ -1036,6 +1037,8 @@ public class JournalImpl extends JournalBase implements
TestableJournal, Journal
onFoundAddInfo = new SimpleFutureImpl<>();
}
+ lineUpContext(callback);
+
if (onFoundAddInfo == null) {
internalAppendUpdateRecord(id, recordType, persister, record, false,
false, null, callback);
} else {
@@ -1142,7 +1145,6 @@ public class JournalImpl extends JournalBase implements
TestableJournal, Journal
}
checkJournalIsLoaded();
- lineUpContext(callback);
final SimpleFuture<Boolean> onFoundAddInfo;
@@ -1152,6 +1154,8 @@ public class JournalImpl extends JournalBase implements
TestableJournal, Journal
onFoundAddInfo = new SimpleFutureImpl<>();
}
+ lineUpContext(callback);
+
if (onFoundAddInfo == null) {
internalAppendDeleteRecord(id, false, null, callback);
} else {
@@ -1448,13 +1452,13 @@ public class JournalImpl extends JournalBase implements
TestableJournal, Journal
}
}
- private void setErrorCondition(IOCallback otherCallback, JournalTransaction
jt, Throwable t) {
+ private void setErrorCondition(IOCallback ioCallback, JournalTransaction
jt, Throwable t) {
if (jt != null) {
jt.onError(ActiveMQExceptionType.IO_ERROR.getCode(), t.getMessage());
}
- if (otherCallback != null) {
- otherCallback.onError(ActiveMQExceptionType.IO_ERROR.getCode(),
t.getMessage());
+ if (ioCallback != null) {
+ ioCallback.onError(ActiveMQExceptionType.IO_ERROR.getCode(),
t.getMessage());
}
}
@@ -1467,9 +1471,6 @@ public class JournalImpl extends JournalBase implements
TestableJournal, Journal
final IOCompletion callback,
final boolean lineUpContext) throws
Exception {
checkJournalIsLoaded();
- if (lineUpContext) {
- lineUpContext(callback);
- }
if (logger.isTraceEnabled()) {
logger.trace("scheduling appendCommitRecord::txID={}", txID);
@@ -1480,6 +1481,9 @@ public class JournalImpl extends JournalBase implements
TestableJournal, Journal
txcheck.checkErrorCondition();
}
+ if (lineUpContext) {
+ lineUpContext(callback);
+ }
final SimpleFuture<JournalTransaction> result =
newSyncAndCallbackResult(sync, callback);
@@ -3523,4 +3527,9 @@ public class JournalImpl extends JournalBase implements
TestableJournal, Journal
public int getCompactCount() {
return compactCount;
}
+
+ public void markTXError(long txID, Throwable t) {
+ JournalTransaction tx = transactions.get(txID);
+ tx.onError(-1, t.getMessage());
+ }
}
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/StorageManager.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/StorageManager.java
index a4a11359e0..9ff3f8ea38 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/StorageManager.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/StorageManager.java
@@ -117,8 +117,6 @@ public interface StorageManager extends MapStorageManager,
IDGenerator, ActiveMQ
*/
OperationContext getContext();
- void lineUpContext();
-
/**
* It just creates an OperationContext without associating it
*/
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
index c3fc7653ae..4f6fbe0e19 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
@@ -1763,16 +1763,6 @@ public abstract class AbstractJournalStorageManager
extends CriticalComponentImp
persistedKeyValuePairs.put(keyValuePair.getKey(), keyValuePair);
}
- @Override
- public void lineUpContext() {
- try (ArtemisCloseable lock = closeableReadLock()) {
- messageJournal.lineUpContext(getContext());
- }
- }
-
- // ActiveMQComponent implementation
- // ------------------------------------------------------
-
protected abstract void beforeStart() throws Exception;
@Override
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageManager.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageManager.java
index 89f084685b..47daeae68b 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageManager.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageManager.java
@@ -647,10 +647,6 @@ public class NullStorageManager implements StorageManager {
public void commit(final long txID, final boolean lineUpContext) throws
Exception {
}
- @Override
- public void lineUpContext() {
- }
-
@Override
public void deletePendingLargeMessage(final long recordID) throws Exception
{
}
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
index 4c112e5990..1808ba85e2 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
@@ -108,6 +108,7 @@ import org.apache.activemq.artemis.utils.CompositeAddress;
import org.apache.activemq.artemis.utils.JsonLoader;
import org.apache.activemq.artemis.utils.PrefixUtil;
import org.apache.activemq.artemis.utils.collections.TypedProperties;
+import org.apache.activemq.artemis.utils.critical.CriticalComponentImpl;
import org.apache.activemq.artemis.utils.runnables.AtomicRunnable;
import org.apache.activemq.artemis.utils.runnables.RunnableList;
import org.slf4j.Logger;
@@ -116,10 +117,12 @@ import org.slf4j.LoggerFactory;
/**
* Server side Session implementation
*/
-public class ServerSessionImpl implements ServerSession, FailureListener {
+public class ServerSessionImpl extends CriticalComponentImpl implements
ServerSession, FailureListener {
private static final Logger logger =
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+ private static final int CRITICAL_PATH_CLOSE = 0;
+
private boolean securityEnabled = true;
private final String securityDomain;
@@ -202,6 +205,9 @@ public class ServerSessionImpl implements ServerSession,
FailureListener {
// server. Both the request and failure listener will
// try to close one session from different threads
// concurrently.
+ private volatile boolean closing = false;
+
+ // When the doClose is called, we will make it actually closed
private volatile boolean closed = false;
private boolean prefixEnabled = false;
@@ -237,6 +243,8 @@ public class ServerSessionImpl implements ServerSession,
FailureListener {
final Map<SimpleString, RoutingType> prefixes,
final String securityDomain,
boolean isLegacyProducer) throws Exception {
+ super(server.getCriticalAnalyzer(), 1);
+
this.username = username;
this.password = password;
@@ -339,7 +347,7 @@ public class ServerSessionImpl implements ServerSession,
FailureListener {
@Override
public boolean isClosed() {
- return closed;
+ return closing;
}
@Override
@@ -404,6 +412,10 @@ public class ServerSessionImpl implements ServerSession,
FailureListener {
callback.close(failed);
}
synchronized (this) {
+ if (closed) {
+ return;
+ }
+ closed = true;
if (server.hasBrokerSessionPlugins()) {
server.callBrokerSessionPlugins(plugin ->
plugin.beforeCloseSession(this, failed));
}
@@ -609,7 +621,7 @@ public class ServerSessionImpl implements ServerSession,
FailureListener {
ServerConsumer consumer;
synchronized (this) {
- if (closed) {
+ if (closing) {
throw
ActiveMQMessageBundle.BUNDLE.cannotCreateConsumerOnClosedSession(queueName);
}
consumer = new ServerConsumerImpl(consumerID, this, (QueueBinding)
binding, filter, priority, started, browseOnly, storageManager, callback,
preAcknowledge, strictUpdateDeliveryCount, managementService,
supportLargeMessage, credits, server);
@@ -1777,37 +1789,47 @@ public class ServerSessionImpl implements
ServerSession, FailureListener {
@Override
public void close(final boolean failed, final boolean force) {
synchronized (this) {
- if (closed) {
+ if (closing) {
return;
}
- closed = true;
+ closing = true;
}
if (force) {
context.reset();
}
+ // We only add the session as component on the critical analyzer
+ // while the close is happening between the user's thread and the
context's thread.
+ // Once finishClose is called to complete the operation, leaveCritical
is called
+ // and the session is removed from the component's list on the critical
analyzer.
+ enterCritical(CRITICAL_PATH_CLOSE);
+ getCriticalAnalyzer().add(this);
+
context.executeOnCompletion(new IOCallback() {
@Override
public void onError(int errorCode, String errorMessage) {
- callDoClose();
+ finishClose(failed);
}
@Override
public void done() {
- callDoClose();
- }
-
- private void callDoClose() {
- try {
- doClose(failed);
- } catch (Exception e) {
- ActiveMQServerLogger.LOGGER.errorClosingSession(e);
- }
+ finishClose(failed);
}
});
}
+ private void finishClose(boolean failed) {
+ try {
+ doClose(failed);
+ } catch (Exception e) {
+ ActiveMQServerLogger.LOGGER.errorClosingSession(e);
+ } finally {
+ leaveCritical(CRITICAL_PATH_CLOSE);
+ getCriticalAnalyzer().remove(this);
+ }
+ }
+
@Override
public void closeConsumer(final long consumerID) throws Exception {
final ServerConsumer consumer = locateConsumer(consumerID);
@@ -2192,6 +2214,30 @@ public class ServerSessionImpl implements ServerSession,
FailureListener {
@Override
public String toString() {
StringBuilder sb = new StringBuilder();
+ try {
+ long txID;
+ if (tx != null) {
+ txID = tx.getID();
+ } else {
+ txID = -1L;
+ }
+ sb.append("name=").append(name).append(",");
+ sb.append("consumers=").append(consumers.size()).append(",");
+ sb.append("txID=").append(txID).append(",");
+ sb.append("remotingConnection=").append(remotingConnection);
+ } catch (Throwable justLogit) {
+ logger.debug(justLogit.getMessage(), justLogit);
+ }
+
+ insertMetadata(sb);
+
+ // This will actually appear on some management operations
+ // so please don't clog this with debug objects
+ // unless you provide a special way for management to translate sessions
+ return "ServerSessionImpl(" + sb + ")";
+ }
+
+ private void insertMetadata(StringBuilder sb) {
if (this.metaData != null) {
for (Map.Entry<String, String> value : metaData.entrySet()) {
if (!sb.isEmpty()) {
@@ -2205,22 +2251,15 @@ public class ServerSessionImpl implements
ServerSession, FailureListener {
}
}
}
- // This will actually appear on some management operations
- // so please don't clog this with debug objects
- // unless you provide a special way for management to translate sessions
- return "ServerSessionImpl(" + sb.toString() + ")";
}
- // FailureListener implementation
- // --------------------------------------------------------------------
-
@Override
public void connectionFailed(final ActiveMQException me, boolean
failedOver) {
/*
* This can be invoked from Netty (via channelInactive) when the
connection has already been closed causing
* spurious logging about clearing up resources for failed client
connections.
*/
- if (closed)
+ if (closing)
return;
try {
diff --git
a/artemis-server/src/test/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImplTest.java
b/artemis-server/src/test/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImplTest.java
index 0430348967..524f765628 100644
---
a/artemis-server/src/test/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImplTest.java
+++
b/artemis-server/src/test/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImplTest.java
@@ -332,11 +332,6 @@ public class TransactionImplTest extends ServerTestBase {
return null;
}
- @Override
- public void lineUpContext() {
-
- }
-
@Override
public AbstractPersistedAddressSetting
recoverAddressSettings(SimpleString address) {
return null;
diff --git
a/tests/artemis-test-support/src/main/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/JournalImplTestBase.java
b/tests/artemis-test-support/src/main/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/JournalImplTestBase.java
index bc216d8a0b..d9bd173962 100644
---
a/tests/artemis-test-support/src/main/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/JournalImplTestBase.java
+++
b/tests/artemis-test-support/src/main/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/JournalImplTestBase.java
@@ -46,6 +46,8 @@ import
org.apache.activemq.artemis.core.journal.TestableJournal;
import org.apache.activemq.artemis.core.journal.impl.JournalFile;
import org.apache.activemq.artemis.core.journal.impl.JournalImpl;
import org.apache.activemq.artemis.core.journal.impl.JournalReaderCallback;
+import
org.apache.activemq.artemis.core.journal.impl.dataformat.ByteArrayEncoding;
+import org.apache.activemq.artemis.core.persistence.OperationContext;
import org.apache.activemq.artemis.logs.AssertionLoggerHandler;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.utils.ReusableLatch;
@@ -389,16 +391,20 @@ public abstract class JournalImplTestBase extends
ActiveMQTestBase {
}
protected void add(final long... arguments) throws Exception {
- addWithSize(recordLength, arguments);
+ addWithSize(recordLength, null, arguments);
}
protected void addWithSize(final int size, final long... arguments) throws
Exception {
+ addWithSize(size, null, arguments);
+ }
+
+ protected void addWithSize(final int size, OperationContext context, final
long... arguments) throws Exception {
for (long element : arguments) {
byte[] record = generateRecord(size);
beforeJournalOperation();
- journal.appendAddRecord(element, (byte) 0, record, sync);
+ journal.appendAddRecord(element, (byte) 0, new
ByteArrayEncoding(record), sync, context);
records.add(new RecordInfo(element, (byte) 0, record, false, false,
(short) 0));
}
@@ -465,12 +471,16 @@ public abstract class JournalImplTestBase extends
ActiveMQTestBase {
}
protected void addTx(final long txID, final long... arguments) throws
Exception {
+ addTxWithSize(recordLength, txID, arguments);
+ }
+
+ protected void addTxWithSize(final int size, final long txID, final long...
arguments) throws Exception {
TransactionHolder tx = getTransaction(txID);
for (long element : arguments) {
// SIZE_BYTE + SIZE_LONG + SIZE_LONG + SIZE_INT + record.length +
// SIZE_BYTE
- byte[] record = generateRecord(recordLength -
(JournalImpl.SIZE_ADD_RECORD_TX + 1));
+ byte[] record = generateRecord(size - (JournalImpl.SIZE_ADD_RECORD_TX
+ 1));
beforeJournalOperation();
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/SendAckFailTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/SendAckFailTest.java
index 71d17944e3..76e0c02687 100644
---
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/SendAckFailTest.java
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/SendAckFailTest.java
@@ -345,11 +345,6 @@ public class SendAckFailTest extends SpawnedTestBase {
return manager.getContext();
}
- @Override
- public void lineUpContext() {
- manager.lineUpContext();
- }
-
@Override
public OperationContext newContext(Executor executor) {
return manager.newContext(executor);
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/NIOJournalCompactTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/NIOJournalCompactTest.java
index a5026dfb5b..3ef6922a17 100644
---
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/NIOJournalCompactTest.java
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/NIOJournalCompactTest.java
@@ -496,6 +496,49 @@ public class NIOJournalCompactTest extends
JournalImplTestBase {
loadAndCheck();
}
+ @Test
+ public void testCommitOnRolledBack() throws Exception {
+ ExecutorService executorService = Executors.newSingleThreadExecutor();
+ runAfter(executorService::shutdownNow);
+
+ setup(2, 60 * 1024, false);
+
+ createJournal();
+
+ startJournal();
+
+ load();
+
+ addTx(1, 2);
+ rollback(1);
+ startCompact();
+
+ OperationContextImpl context = new OperationContextImpl(executorService);
+ journal.appendCommitRecord(1, false, context);
+ CountDownLatch latch = new CountDownLatch(1);
+ AtomicInteger error = new AtomicInteger(0);
+ context.executeOnCompletion(new IOCallback() {
+ @Override
+ public void done() {
+ latch.countDown();
+ }
+
+ @Override
+ public void onError(int errorCode, String errorMessage) {
+ error.incrementAndGet();
+ latch.countDown();
+ }
+ });
+
+ assertTrue(latch.await(10, TimeUnit.SECONDS));
+
+ assertEquals(1, error.get());
+
+ finishCompact();
+
+ stopJournal();
+ }
+
@Test
public void testCompactPrepareRestart2() throws Exception {
setup(2, 60 * 1024, false);
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/SessionCloseTimeoutTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/SessionCloseTimeoutTest.java
new file mode 100644
index 0000000000..91ec30cfea
--- /dev/null
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/SessionCloseTimeoutTest.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.persistence;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import java.lang.invoke.MethodHandles;
+import java.util.HashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.activemq.artemis.api.core.ActiveMQException;
+import org.apache.activemq.artemis.api.core.QueueConfiguration;
+import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.api.core.TransportConfiguration;
+import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.jms.client.ActiveMQMessageConsumer;
+import org.apache.activemq.artemis.logs.AssertionLoggerHandler;
+import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
+import org.apache.activemq.artemis.tests.util.CFUtil;
+import org.apache.activemq.artemis.utils.RandomUtil;
+import org.apache.activemq.artemis.utils.Wait;
+import org.apache.activemq.artemis.utils.critical.CriticalAnalyzerPolicy;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * A simple test-case used for documentation purposes.
+ */
+public class SessionCloseTimeoutTest extends ActiveMQTestBase {
+
+ private static final Logger logger =
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+ ActiveMQServer server;
+
+ @BeforeEach
+ protected void createServer() throws Exception {
+
+ ConfigurationImpl configuration =
createBasicConfig(0).setJMXManagementEnabled(false).addAcceptorConfiguration(new
TransportConfiguration(INVM_ACCEPTOR_FACTORY, generateInVMParams(0), "invm"));
+
configuration.setCriticalAnalyzer(true).setCriticalAnalyzerPolicy(CriticalAnalyzerPolicy.SHUTDOWN).setCriticalAnalyzerTimeout(1000);
+
+ HashMap<String, Object> extraConfig = new HashMap<>();
+ HashMap<String, Object> regularConfig = new HashMap<>();
+
+ configuration.addAcceptorConfiguration(new
TransportConfiguration(NETTY_ACCEPTOR_FACTORY, regularConfig, "netty",
extraConfig));
+ server = createServer(true, configuration);
+ server.start();
+ }
+
+
+ /**
+ * This is simulating a context that will never finish, the timeout should
take care of it.
+ */
+ @Test
+ public void testSessionCloseTimeout() throws Exception {
+
+ int numberOfMessages = 100;
+
+ String queueName = getName();
+
+ final String tag = RandomUtil.randomAlphaNumericString(20);
+ AtomicInteger frozenSessions = new AtomicInteger(0);
+
+ try (AssertionLoggerHandler loggerHandler = new
AssertionLoggerHandler()) {
+
+ org.apache.activemq.artemis.core.server.Queue serverQueue =
server.createQueue(QueueConfiguration.of(queueName).setRoutingType(RoutingType.ANYCAST));
+ ConnectionFactory connectionFactory =
CFUtil.createConnectionFactory("CORE", "tcp://localhost:61616");
+ try (Connection connection = connectionFactory.createConnection()) {
+ connection.setClientID(tag);
+ connection.start();
+ Session session = connection.createSession(true,
Session.SESSION_TRANSACTED);
+ Queue queue = session.createQueue(queueName);
+
+ ActiveMQMessageConsumer consumer = (ActiveMQMessageConsumer)
session.createConsumer(queue);
+
+ MessageProducer producer = session.createProducer(queue);
+ for (int i = 0; i < numberOfMessages; i++) {
+ TextMessage message = session.createTextMessage("hello " + i);
+ message.setIntProperty("i", i);
+ producer.send(message);
+ }
+ session.commit();
+
+ Wait.assertEquals(numberOfMessages, serverQueue::getMessageCount,
5000);
+
+ Wait.waitFor(() -> serverQueue.getDeliveringCount() > 1);
+ assertNotNull(consumer.receive(5000));
+
+ server.getRemotingService().getConnections().stream().filter(r ->
String.valueOf(r.getClientID()).equals(tag)).forEach(r -> {
+ server.getSessions().stream().filter(s ->
s.getRemotingConnection() == r).forEach(s -> {
+ // this will make the context to never finish
+ s.getSessionContext().storeLineUp();
+ frozenSessions.incrementAndGet();
+ });
+ r.fail(new ActiveMQException("fail"));
+ r.getTransportConnection().disconnect();
+ });
+ }
+
+ assertTrue(frozenSessions.get() > 0);
+
+ Wait.assertFalse(server::isStarted);
+
+ assertTrue(loggerHandler.findText("AMQ224107"), "Critical Analyzer
supposed to happen");
+
+ createServer();
+
+ connectionFactory = CFUtil.createConnectionFactory("CORE",
"tcp://localhost:61616");
+
+ try (Connection connection = connectionFactory.createConnection()) {
+ connection.start();
+ Session session = connection.createSession(false,
Session.CLIENT_ACKNOWLEDGE);
+ MessageConsumer consumer =
session.createConsumer(session.createQueue(queueName));
+ for (int i = 0; i < numberOfMessages; i++) {
+ TextMessage message = (TextMessage) consumer.receive(5000);
+ assertNotNull(message);
+ assertEquals(i, message.getIntProperty("i"));
+ }
+ }
+ }
+ }
+}
\ No newline at end of file
diff --git
a/tests/performance-tests/src/test/java/org/apache/activemq/artemis/tests/performance/journal/JournalImplTestUnit.java
b/tests/performance-tests/src/test/java/org/apache/activemq/artemis/tests/performance/journal/JournalImplTestUnit.java
index c94ffac741..7031497bde 100644
---
a/tests/performance-tests/src/test/java/org/apache/activemq/artemis/tests/performance/journal/JournalImplTestUnit.java
+++
b/tests/performance-tests/src/test/java/org/apache/activemq/artemis/tests/performance/journal/JournalImplTestUnit.java
@@ -18,18 +18,26 @@ package
org.apache.activemq.artemis.tests.performance.journal;
import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import org.apache.activemq.artemis.core.journal.IOCompletion;
import org.apache.activemq.artemis.core.journal.Journal;
import org.apache.activemq.artemis.core.journal.impl.JournalImpl;
+import
org.apache.activemq.artemis.core.persistence.impl.journal.OperationContextImpl;
import org.apache.activemq.artemis.nativo.jlibaio.LibaioContext;
import
org.apache.activemq.artemis.tests.unit.core.journal.impl.JournalImplTestBase;
import
org.apache.activemq.artemis.tests.unit.core.journal.impl.fakes.SimpleEncoding;
import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
public abstract class JournalImplTestUnit extends JournalImplTestBase {
@@ -83,6 +91,109 @@ public abstract class JournalImplTestUnit extends
JournalImplTestBase {
}
+ @Test
+ public void testCommitOnError() throws Exception {
+ ExecutorService executorService = Executors.newSingleThreadExecutor();
+ runAfter(executorService::shutdownNow);
+ setup(10, 10 * 1024 * 1024, true);
+ createJournal();
+ startJournal();
+ load();
+ addTx(1, 1L);
+ ((JournalImpl)journal).markTXError(1, new Exception("test"));
+ OperationContextImpl context = new OperationContextImpl(executorService);
+ Assertions.assertThrows(Exception.class, () -> {
+ journal.appendCommitRecord(1, true, context, true);
+ });
+ CountDownLatch latch = new CountDownLatch(1);
+ context.executeOnCompletion(new IOCompletion() {
+ @Override
+ public void storeLineUp() {
+
+ }
+
+ @Override
+ public void done() {
+ latch.countDown();
+ }
+
+ @Override
+ public void onError(int errorCode, String errorMessage) {
+
+ }
+ });
+
+ assertTrue(latch.await(5, TimeUnit.SECONDS));
+ }
+
+
+ @Test
+ public void testBiggerRecordTX() throws Exception {
+ ExecutorService executorService = Executors.newSingleThreadExecutor();
+ runAfter(executorService::shutdownNow);
+ setup(10, 10 * 1024 * 1024, true);
+ createJournal();
+ startJournal();
+ load();
+ OperationContextImpl context = new OperationContextImpl(executorService);
+ Assertions.assertThrows(Exception.class, () -> {
+ addTxWithSize(1024 * 1024, 1, 1);
+ });
+ CountDownLatch latch = new CountDownLatch(1);
+ context.executeOnCompletion(new IOCompletion() {
+ @Override
+ public void storeLineUp() {
+
+ }
+
+ @Override
+ public void done() {
+ latch.countDown();
+ }
+
+ @Override
+ public void onError(int errorCode, String errorMessage) {
+
+ }
+ });
+
+ assertTrue(latch.await(5, TimeUnit.SECONDS));
+ }
+
+
+ @Test
+ public void testBiggerRecord() throws Exception {
+ ExecutorService executorService = Executors.newSingleThreadExecutor();
+ runAfter(executorService::shutdownNow);
+ setup(10, 10 * 1024 * 1024, true);
+ createJournal();
+ startJournal();
+ load();
+ OperationContextImpl context = new OperationContextImpl(executorService);
+ Assertions.assertThrows(Exception.class, () -> {
+ addWithSize(1024 * 1024, context, 1, 1);
+ });
+ CountDownLatch latch = new CountDownLatch(1);
+ context.executeOnCompletion(new IOCompletion() {
+ @Override
+ public void storeLineUp() {
+
+ }
+
+ @Override
+ public void done() {
+ latch.countDown();
+ }
+
+ @Override
+ public void onError(int errorCode, String errorMessage) {
+ new Exception("errorCode=" + errorCode).printStackTrace();
+ }
+ });
+
+ assertTrue(latch.await(5, TimeUnit.SECONDS));
+ }
+
@Test
public void testAddUpdateDeleteManySmallFileSize() throws Exception {
final int numberAdds = 1000;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact