This is an automated email from the ASF dual-hosted git repository.
clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/main by this push:
new 3ec0274356 ARTEMIS-4523 Openwire leaving consumers isolated after
reconnects
3ec0274356 is described below
commit 3ec0274356ef5adc0aa6a9ac53f5d28008305b95
Author: Clebert Suconic <[email protected]>
AuthorDate: Fri Dec 1 12:22:29 2023 -0500
ARTEMIS-4523 Openwire leaving consumers isolated after reconnects
co-authored with Gary Tully
---
.../artemis/utils/actors/ProcessorBase.java | 2 +-
.../artemis/utils/actors/ThresholdActor.java | 15 +-
.../artemis/utils/actors/ThresholdActorTest.java | 49 -----
.../core/protocol/openwire/OpenWireConnection.java | 53 ++----
.../protocol/openwire/OpenWireProtocolManager.java | 37 ++--
.../isolated/client/ConnectionDroppedTest.java | 58 ++++++
.../openwire/FastReconnectOpenWireTest.java | 211 +++++++++++++++++++++
.../artemis/tests/leak/MemoryAssertions.java | 2 +
8 files changed, 304 insertions(+), 123 deletions(-)
diff --git
a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/ProcessorBase.java
b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/ProcessorBase.java
index 0c4faa25f5..323da6a5d7 100644
---
a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/ProcessorBase.java
+++
b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/ProcessorBase.java
@@ -49,7 +49,7 @@ public abstract class ProcessorBase<T> extends HandlerBase {
// Request of forced shutdown
private volatile boolean requestedForcedShutdown = false;
// Request of educated shutdown:
- private volatile boolean requestedShutdown = false;
+ protected volatile boolean requestedShutdown = false;
// Request to yield to another thread
private volatile boolean yielded = false;
diff --git
a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/ThresholdActor.java
b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/ThresholdActor.java
index 4e7bbb6ded..7b79d45ada 100644
---
a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/ThresholdActor.java
+++
b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/ThresholdActor.java
@@ -41,7 +41,6 @@ public class ThresholdActor<T> extends ProcessorBase<Object> {
private final ActorListener<T> listener;
private final Runnable overThreshold;
private final Runnable clearThreshold;
- private volatile Runnable shutdownTask;
public ThresholdActor(Executor parent, ActorListener<T> listener, int
maxSize, ToIntFunction<T> sizeGetter, Runnable overThreshold, Runnable
clearThreshold) {
super(parent);
@@ -54,10 +53,6 @@ public class ThresholdActor<T> extends ProcessorBase<Object>
{
@Override
protected final void doTask(Object task) {
- if (task == shutdownTask) {
- shutdownTask.run();
- return;
- }
if (task == FLUSH) {
clearThreshold.run();
// should set to 0 no matter the value. There's a single thread
setting this value back to zero
@@ -100,14 +95,8 @@ public class ThresholdActor<T> extends
ProcessorBase<Object> {
}
}
- public void shutdown(Runnable runnable) {
- // do no more pending work
+ public void requestShutdown() {
+ requestedShutdown = true;
tasks.clear();
- // run this task next
- shutdownTask = runnable;
- tasks.add(runnable);
- // wait for shutdown task to complete
- flush();
- shutdown();
}
}
diff --git
a/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/actors/ThresholdActorTest.java
b/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/actors/ThresholdActorTest.java
index e554d632f7..5d7fe27c13 100644
---
a/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/actors/ThresholdActorTest.java
+++
b/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/actors/ThresholdActorTest.java
@@ -182,53 +182,4 @@ public class ThresholdActorTest {
executorService.shutdown();
}
}
-
-
- @Test
- public void testShutdownTask() throws Exception {
- AtomicInteger lastAcquireFailed = new AtomicInteger(0);
- lastProcessed.set(0);
-
- Semaphore allowedTasks = new Semaphore(10);
- CountDownLatch completedTasks = new CountDownLatch(11);
- CountDownLatch pendingTasks = new CountDownLatch(11);
-
- final ExecutorService executorService =
Executors.newSingleThreadExecutor();
-
- ThresholdActor<Integer> actor = new ThresholdActor<>(executorService,
(i) -> {
- try {
- pendingTasks.countDown();
- if (allowedTasks.tryAcquire(1, 200, TimeUnit.MILLISECONDS)) {
- lastProcessed.set(i);
- } else {
- lastAcquireFailed.set(i);
- }
- completedTasks.countDown();
- } catch (InterruptedException ignored) {
- }
-
- }, 1000, (e) -> {
- return 1;
- }, () -> {
- }, () -> {
- });
-
- // expect allowedTasks tasks to complete
- for (int i = 1; i < 100; i++) {
- actor.act(i);
- }
- // wait for task processing
- Assert.assertTrue(pendingTasks.await(4, TimeUnit.SECONDS));
-
- actor.shutdown(() -> {
- lastProcessed.set(lastProcessed.get() * 1000);
- });
-
- Assert.assertTrue(completedTasks.await(4, TimeUnit.SECONDS));
-
- // assert processing terminated at block point
- Assert.assertEquals(10000, lastProcessed.get());
- // pending task executed as expected
- Assert.assertEquals(11, lastAcquireFailed.get());
- }
}
diff --git
a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
index ee895a51a1..670eca258d 100644
---
a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
+++
b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
@@ -17,7 +17,6 @@
package org.apache.activemq.artemis.core.protocol.openwire;
import javax.jms.IllegalStateException;
-import javax.jms.InvalidClientIDException;
import javax.jms.InvalidDestinationException;
import javax.jms.JMSSecurityException;
import javax.transaction.xa.XAException;
@@ -761,7 +760,11 @@ public class OpenWireConnection extends
AbstractRemotingConnection implements Se
final ThresholdActor<Command> localVisibleActor = openWireActor;
if (localVisibleActor != null) {
- localVisibleActor.shutdown(() -> doFail(me, message));
+ localVisibleActor.requestShutdown();
+ }
+
+ if (executor != null) {
+ executor.execute(() -> doFail(me, message));
} else {
doFail(me, message);
}
@@ -779,11 +782,16 @@ public class OpenWireConnection extends
AbstractRemotingConnection implements Se
}
try {
if (this.getConnectionInfo() != null) {
- protocolManager.removeConnection(this.getConnectionInfo(), me);
+ protocolManager.removeConnection(getClientID(), this);
}
- } catch (InvalidClientIDException e) {
- logger.warn("Couldn't close connection because invalid clientID", e);
} finally {
+ try {
+ disconnect(false);
+ } catch (Throwable e) {
+ // it should never happen, but never say never
+ logger.debug("OpenWireConnection::disconnect failure", e);
+ }
+
// there may be some transactions not associated with sessions
// deal with them after sessions are removed via connection removal
operationContext.executeOnCompletion(new IOCallback() {
@@ -876,29 +884,6 @@ public class OpenWireConnection extends
AbstractRemotingConnection implements Se
internalSession =
server.createSession(UUIDGenerator.getInstance().generateStringUUID(),
context.getUserName(), info.getPassword(), -1, this, true, false, false, false,
null, null, true, operationContext, protocolManager.getPrefixes(),
protocolManager.getSecurityDomain(), validatedUser, false);
}
- //raise the refCount of context
- public void reconnect(AMQConnectionContext existingContext, ConnectionInfo
info) throws Exception {
- this.context = existingContext;
- WireFormatInfo wireFormatInfo = inWireFormat.getPreferedWireFormatInfo();
- // Older clients should have been defaulting this field to true.. but
- // they were not.
- if (wireFormatInfo != null && wireFormatInfo.getVersion() <= 2) {
- info.setClientMaster(true);
- }
- if (info.getClientIp() == null) {
- info.setClientIp(getRemoteAddress());
- }
- createInternalSession(info);
- state = new ConnectionState(info);
- state.reset(info);
-
- context.setConnection(this);
- context.setConnectionState(state);
- context.setClientMaster(info.isClientMaster());
- context.setFaultTolerant(info.isFaultTolerant());
- context.setReconnect(true);
- }
-
/**
* This will answer with commands to the client
*/
@@ -1817,13 +1802,13 @@ public class OpenWireConnection extends
AbstractRemotingConnection implements Se
@Override
public Response processRemoveConnection(ConnectionId id, long
lastDeliveredSequenceId) throws Exception {
//we let protocol manager to handle connection add/remove
+ for (SessionState sessionState : state.getSessionStates()) {
+ propagateLastSequenceId(sessionState, lastDeliveredSequenceId);
+ }
try {
- for (SessionState sessionState : state.getSessionStates()) {
- propagateLastSequenceId(sessionState, lastDeliveredSequenceId);
- }
- protocolManager.removeConnection(state.getInfo(), null);
- } catch (Throwable e) {
- // log
+ protocolManager.removeConnection(getClientID(),
OpenWireConnection.this);
+ } finally {
+ OpenWireConnection.this.disconnect(false);
}
return null;
}
diff --git
a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java
b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java
index f1fd7fb6e1..56ff48552c 100644
---
a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java
+++
b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java
@@ -109,7 +109,7 @@ public class OpenWireProtocolManager extends
AbstractProtocolManager<Command, O
private final CopyOnWriteArrayList<OpenWireConnection> connections = new
CopyOnWriteArrayList<>();
- private final Map<String, AMQConnectionContext> clientIdSet = new
ConcurrentHashMap<>();
+ private final ConcurrentHashMap<String, OpenWireConnection> clientIdSet =
new ConcurrentHashMap<>();
private String brokerName;
@@ -244,18 +244,9 @@ public class OpenWireProtocolManager extends
AbstractProtocolManager<Command, O
}
}
- public void removeConnection(ConnectionInfo info, Throwable error) throws
InvalidClientIDException {
- String clientId = info.getClientId();
- if (clientId != null) {
- AMQConnectionContext context = this.clientIdSet.remove(clientId);
- if (context != null) {
- //connection is still there and need to close
- context.getConnection().disconnect(error != null);
- this.connections.remove(context.getConnection());
- }
- } else {
- throw new InvalidClientIDException("No clientID specified for
connection disconnect request");
- }
+ public void removeConnection(String clientID, OpenWireConnection
connection) {
+ clientIdSet.remove(clientID, connection);
+ connections.remove(connection);
}
/*** if set, the OpenWire connection will bypass the tcpReadBuferSize and
use this value instead.
@@ -421,22 +412,16 @@ public class OpenWireProtocolManager extends
AbstractProtocolManager<Command, O
}
AMQConnectionContext context;
- context = clientIdSet.get(clientId);
- if (context != null) {
- if (info.isFailoverReconnect()) {
- OpenWireConnection oldConnection = context.getConnection();
- oldConnection.disconnect(true);
- connections.remove(oldConnection);
- connection.reconnect(context, info);
- } else {
- throw new InvalidClientIDException("Broker: " + getBrokerName() +
" - Client: " + clientId + " already connected from " +
context.getConnection().getRemoteAddress());
+ OpenWireConnection oldConnection = clientIdSet.get(clientId);
+ if (oldConnection != null) {
+ if (!info.isFailoverReconnect()) {
+ throw new InvalidClientIDException("Broker: " + getBrokerName() +
" - Client: " + clientId + " already connected from " +
oldConnection.getRemoteAddress());
}
- } else {
- //new connection
- context = connection.initContext(info);
- clientIdSet.put(clientId, context);
}
+ context = connection.initContext(info);
+
+ clientIdSet.put(clientId, connection);
connections.add(connection);
ActiveMQTopic topic = AdvisorySupport.getConnectionAdvisoryTopic();
diff --git
a/tests/integration-tests-isolated/src/test/java/org/apache/activemq/artemis/tests/integration/isolated/client/ConnectionDroppedTest.java
b/tests/integration-tests-isolated/src/test/java/org/apache/activemq/artemis/tests/integration/isolated/client/ConnectionDroppedTest.java
index 542343fd80..df5e4f2f63 100644
---
a/tests/integration-tests-isolated/src/test/java/org/apache/activemq/artemis/tests/integration/isolated/client/ConnectionDroppedTest.java
+++
b/tests/integration-tests-isolated/src/test/java/org/apache/activemq/artemis/tests/integration/isolated/client/ConnectionDroppedTest.java
@@ -41,6 +41,8 @@ import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.persistence.OperationContext;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.Queue;
+import org.apache.activemq.artemis.core.server.ServerConsumer;
+import
org.apache.activemq.artemis.core.server.plugin.ActiveMQServerConsumerPlugin;
import
org.apache.activemq.artemis.core.server.plugin.ActiveMQServerSessionPlugin;
import org.apache.activemq.artemis.logs.AssertionLoggerHandler;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
@@ -498,4 +500,60 @@ public class ConnectionDroppedTest extends
ActiveMQTestBase {
}
+
+ @Test(timeout = 10_000)
+ public void testForceDropOpenWire() throws Throwable {
+ ActiveMQServer server = createServer(true, createDefaultConfig(true));
+ server.start();
+
+ Queue serverQueue = server.createQueue(new
QueueConfiguration("test-queue").setRoutingType(RoutingType.ANYCAST).setAddress("test-queue").setAutoCreated(false));
+
+ CountDownLatch beforeCreateCalled = new CountDownLatch(1);
+ CountDownLatch goCreateConsumer = new CountDownLatch(1);
+ server.registerBrokerPlugin(new ActiveMQServerConsumerPlugin() {
+ @Override
+ public void afterCreateConsumer(ServerConsumer consumer) throws
ActiveMQException {
+ if (consumer.getQueue() == serverQueue) {
+ logger.info("Creating a consumer at {}", consumer.getQueue());
+ beforeCreateCalled.countDown();
+ try {
+ goCreateConsumer.await(5, TimeUnit.MINUTES);
+ } catch (Exception e) {
+ logger.warn(e.getMessage(), e);
+ }
+ }
+ }
+ });
+
+ ExecutorService executorService = Executors.newFixedThreadPool(1);
+ runAfter(executorService::shutdownNow);
+
+ ConnectionFactory factory = CFUtil.createConnectionFactory("OPENWIRE",
"tcp://localhost:61616");
+
+ CountDownLatch done = new CountDownLatch(1);
+
+ executorService.execute(() -> {
+ try (Connection connection = factory.createConnection();
+ Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
+ MessageConsumer consumer =
session.createConsumer(session.createQueue("test-queue"))) {
+ } catch (Exception e) {
+ logger.warn(e.getMessage(), e);
+ } finally {
+ done.countDown();
+ }
+ });
+
+ Assert.assertTrue(beforeCreateCalled.await(5, TimeUnit.MINUTES));
+
+ server.getRemotingService().getConnections().forEach(r -> {
+ r.fail(new ActiveMQException("this is a simulation"));
+ });
+
+ goCreateConsumer.countDown();
+
+ Wait.assertEquals(0, serverQueue::getConsumerCount);
+ }
+
+
+
}
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/FastReconnectOpenWireTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/FastReconnectOpenWireTest.java
new file mode 100644
index 0000000000..1a41b32580
--- /dev/null
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/FastReconnectOpenWireTest.java
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.openwire;
+
+import javax.jms.Connection;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.RedeliveryPolicy;
+import org.apache.activemq.artemis.api.core.QueueConfiguration;
+import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
+import org.apache.activemq.artemis.tests.util.Wait;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.transport.tcp.TcpTransport;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class FastReconnectOpenWireTest extends OpenWireTestBase {
+
+ // change this number to give the test a bit more of spinning
+ private static final int NUM_ITERATIONS = 50;
+
+ private static final Logger logger =
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+ @Override
+ protected void configureAddressSettings(Map<String, AddressSettings>
addressSettingsMap) {
+ super.configureAddressSettings(addressSettingsMap);
+ // force send to dlq early
+ addressSettingsMap.put("exampleQueue", new
AddressSettings().setAutoCreateQueues(false).setAutoCreateAddresses(false).setDeadLetterAddress(new
SimpleString("ActiveMQ.DLQ")).setAutoCreateAddresses(true).setMaxDeliveryAttempts(2));
+ // force send to dlq late
+ addressSettingsMap.put("exampleQueueTwo", new
AddressSettings().setAutoCreateQueues(false).setAutoCreateAddresses(false).setDeadLetterAddress(new
SimpleString("ActiveMQ.DLQ")).setAutoCreateAddresses(true).setMaxDeliveryAttempts(-1));
+ }
+
+
+ @Test(timeout = 60_000)
+ public void testFastReconnectCreateConsumerNoErrors() throws Exception {
+
+ final ArrayList<Throwable> errors = new ArrayList<>();
+ SimpleString durableQueue = new SimpleString("exampleQueueTwo");
+ this.server.createQueue(new
QueueConfiguration(durableQueue).setRoutingType(RoutingType.ANYCAST));
+
+ Queue queue = new ActiveMQQueue(durableQueue.toString());
+
+ final ActiveMQConnectionFactory exFact = new
ActiveMQConnectionFactory("failover:(tcp://localhost:61616?closeAsync=false)?startupMaxReconnectAttempts=-1&maxReconnectAttempts=-1&timeout=5000");
+ exFact.setWatchTopicAdvisories(false);
+ exFact.setConnectResponseTimeout(10000);
+ exFact.setClientID("myID");
+
+ RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
+ redeliveryPolicy.setRedeliveryDelay(0);
+ redeliveryPolicy.setMaximumRedeliveries(-1);
+ exFact.setRedeliveryPolicy(redeliveryPolicy);
+
+ publish(1000, durableQueue.toString());
+
+ final AtomicInteger numIterations = new AtomicInteger(NUM_ITERATIONS);
+ ExecutorService executor = Executors.newCachedThreadPool();
+ runAfter(executor::shutdownNow);
+
+ final int concurrent = 2;
+ final CountDownLatch done = new CountDownLatch(concurrent);
+ for (int i = 0; i < concurrent; i++) {
+ executor.execute(() -> {
+ try {
+ while (numIterations.decrementAndGet() > 0) {
+ try (Connection conn = exFact.createConnection(); Session
consumerConnectionSession = conn.createSession(Session.SESSION_TRANSACTED);
MessageConsumer messageConsumer =
consumerConnectionSession.createConsumer(queue)) {
+
+ messageConsumer.receiveNoWait();
+
+ try {
+ // force a local socket close such that the broker
sees an exception on the connection and fails the consumer via serverConsumer
close
+ ((ActiveMQConnection)
conn).getTransport().narrow(TcpTransport.class).stop();
+ } catch (Throwable expected) {
+ }
+
+ } catch (javax.jms.InvalidClientIDException expected) {
+ // deliberate clash across concurrent connections
+ } catch (Throwable unexpected) {
+ logger.warn(unexpected.getMessage(), unexpected);
+ errors.add(unexpected);
+ numIterations.set(0);
+ }
+ }
+ } finally {
+ done.countDown();
+ }
+ });
+ }
+
+ assertTrue(done.await(30, TimeUnit.SECONDS));
+
+ Wait.assertEquals(0, () ->
server.locateQueue(durableQueue).getConsumerCount(), 5000);
+ assertTrue(errors.isEmpty());
+
+ }
+
+ @Test(timeout = 60_000)
+ public void testFastReconnectCreateConsumerNoErrorsNoClientId() throws
Exception {
+
+ final ArrayList<Throwable> errors = new ArrayList<>();
+ SimpleString durableQueue = new SimpleString("exampleQueueTwo");
+ this.server.createQueue(new
QueueConfiguration(durableQueue).setRoutingType(RoutingType.ANYCAST));
+
+ Queue queue = new ActiveMQQueue(durableQueue.toString());
+
+ final ActiveMQConnectionFactory exFact = new
ActiveMQConnectionFactory("failover:(tcp://localhost:61616?closeAsync=false)?startupMaxReconnectAttempts=-1&maxReconnectAttempts=-1&timeout=5000");
+ exFact.setWatchTopicAdvisories(false);
+ exFact.setConnectResponseTimeout(10000);
+
+ RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
+ redeliveryPolicy.setRedeliveryDelay(0);
+ redeliveryPolicy.setMaximumRedeliveries(-1);
+ exFact.setRedeliveryPolicy(redeliveryPolicy);
+
+ publish(1000, durableQueue.toString());
+
+ final AtomicInteger numIterations = new AtomicInteger(NUM_ITERATIONS);
+ ExecutorService executor = Executors.newCachedThreadPool();
+ runAfter(executor::shutdownNow);
+
+ final int concurrent = 2;
+ CountDownLatch done = new CountDownLatch(concurrent);
+ for (int i = 0; i < concurrent; i++) {
+ executor.execute(() -> {
+ try (Connection conn = exFact.createConnection();
+ Session consumerConnectionSession =
conn.createSession(Session.SESSION_TRANSACTED);
+ MessageConsumer messageConsumer =
consumerConnectionSession.createConsumer(queue)
+ ) {
+ conn.start();
+ while (numIterations.decrementAndGet() > 0) {
+ try {
+ messageConsumer.receiveNoWait();
+
+ try {
+ // force a local socket close such that the broker
sees an exception on the connection and fails the consumer via serverConsumer
close
+ ((ActiveMQConnection)
conn).getTransport().narrow(TcpTransport.class).stop();
+ } catch (Throwable expected) {
+ }
+ } catch (Throwable unexpected) {
+ errors.add(unexpected);
+ }
+ }
+ } catch (Throwable unexpected) {
+ numIterations.set(0);
+ unexpected.printStackTrace();
+ errors.add(unexpected);
+ } finally {
+ done.countDown();
+ }
+ });
+ }
+
+ assertTrue(done.await(30, TimeUnit.SECONDS));
+
+ Wait.assertEquals(0, () ->
server.locateQueue(durableQueue).getConsumerCount(), 5000);
+
+ assertTrue(errors.isEmpty());
+
+ }
+
+ private void publish(int numMessages, String name) throws Exception {
+ final ActiveMQConnectionFactory exFact = new ActiveMQConnectionFactory();
+ Connection exConn = exFact.createConnection();
+ exConn.start();
+
+ Queue queue = new ActiveMQQueue(name);
+
+ // put a few messages on the queue to have the broker do some dispatch
work
+ Session session = exConn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageProducer producer = session.createProducer(queue);
+
+ TextMessage message = session.createTextMessage("This is a text
message");
+ for (int i = 0; i < numMessages; i++) {
+ message.setIntProperty("SEQ", i);
+ producer.send(message);
+ }
+ session.close();
+ exConn.close();
+
+ }
+}
diff --git
a/tests/leak-tests/src/test/java/org/apache/activemq/artemis/tests/leak/MemoryAssertions.java
b/tests/leak-tests/src/test/java/org/apache/activemq/artemis/tests/leak/MemoryAssertions.java
index dc926e517c..0345e1c7b2 100644
---
a/tests/leak-tests/src/test/java/org/apache/activemq/artemis/tests/leak/MemoryAssertions.java
+++
b/tests/leak-tests/src/test/java/org/apache/activemq/artemis/tests/leak/MemoryAssertions.java
@@ -47,6 +47,8 @@ public class MemoryAssertions {
assertMemory(checkLeak, 0, ProtonServerReceiverContext.class.getName());
assertMemory(checkLeak, 0,
ActiveMQProtonRemotingConnection.class.getName());
assertMemory(checkLeak, 0, RemotingConnectionImpl.class.getName());
+ assertMemory(checkLeak, 0, OpenWireConnection.class.getName());
+ assertMemory(checkLeak, 0,
ActiveMQProtonRemotingConnection.class.getName());
assertMemory(checkLeak, 0, ServerSessionImpl.class.getName());
assertMemory(checkLeak, 0, AMQPSessionContext.class.getName());
assertMemory(checkLeak, 0, ServerConsumerImpl.class.getName());