This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/main by this push:
     new 3ec0274356 ARTEMIS-4523 Openwire leaving consumers isolated after 
reconnects
3ec0274356 is described below

commit 3ec0274356ef5adc0aa6a9ac53f5d28008305b95
Author: Clebert Suconic <[email protected]>
AuthorDate: Fri Dec 1 12:22:29 2023 -0500

    ARTEMIS-4523 Openwire leaving consumers isolated after reconnects
    
    co-authored with Gary Tully
---
 .../artemis/utils/actors/ProcessorBase.java        |   2 +-
 .../artemis/utils/actors/ThresholdActor.java       |  15 +-
 .../artemis/utils/actors/ThresholdActorTest.java   |  49 -----
 .../core/protocol/openwire/OpenWireConnection.java |  53 ++----
 .../protocol/openwire/OpenWireProtocolManager.java |  37 ++--
 .../isolated/client/ConnectionDroppedTest.java     |  58 ++++++
 .../openwire/FastReconnectOpenWireTest.java        | 211 +++++++++++++++++++++
 .../artemis/tests/leak/MemoryAssertions.java       |   2 +
 8 files changed, 304 insertions(+), 123 deletions(-)

diff --git 
a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/ProcessorBase.java
 
b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/ProcessorBase.java
index 0c4faa25f5..323da6a5d7 100644
--- 
a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/ProcessorBase.java
+++ 
b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/ProcessorBase.java
@@ -49,7 +49,7 @@ public abstract class ProcessorBase<T> extends HandlerBase {
    // Request of forced shutdown
    private volatile boolean requestedForcedShutdown = false;
    // Request of educated shutdown:
-   private volatile boolean requestedShutdown = false;
+   protected volatile boolean requestedShutdown = false;
    // Request to yield to another thread
    private volatile boolean yielded = false;
 
diff --git 
a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/ThresholdActor.java
 
b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/ThresholdActor.java
index 4e7bbb6ded..7b79d45ada 100644
--- 
a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/ThresholdActor.java
+++ 
b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/ThresholdActor.java
@@ -41,7 +41,6 @@ public class ThresholdActor<T> extends ProcessorBase<Object> {
    private final ActorListener<T> listener;
    private final Runnable overThreshold;
    private final Runnable clearThreshold;
-   private volatile Runnable shutdownTask;
 
    public ThresholdActor(Executor parent, ActorListener<T> listener, int 
maxSize, ToIntFunction<T> sizeGetter, Runnable overThreshold, Runnable 
clearThreshold) {
       super(parent);
@@ -54,10 +53,6 @@ public class ThresholdActor<T> extends ProcessorBase<Object> 
{
 
    @Override
    protected final void doTask(Object task) {
-      if (task == shutdownTask) {
-         shutdownTask.run();
-         return;
-      }
       if (task == FLUSH) {
          clearThreshold.run();
          // should set to 0 no matter the value. There's a single thread 
setting this value back to zero
@@ -100,14 +95,8 @@ public class ThresholdActor<T> extends 
ProcessorBase<Object> {
       }
    }
 
-   public void shutdown(Runnable runnable) {
-      // do no more pending work
+   public void requestShutdown() {
+      requestedShutdown = true;
       tasks.clear();
-      // run this task next
-      shutdownTask = runnable;
-      tasks.add(runnable);
-      // wait for shutdown task to complete
-      flush();
-      shutdown();
    }
 }
diff --git 
a/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/actors/ThresholdActorTest.java
 
b/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/actors/ThresholdActorTest.java
index e554d632f7..5d7fe27c13 100644
--- 
a/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/actors/ThresholdActorTest.java
+++ 
b/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/actors/ThresholdActorTest.java
@@ -182,53 +182,4 @@ public class ThresholdActorTest {
          executorService.shutdown();
       }
    }
-
-
-   @Test
-   public void testShutdownTask() throws Exception {
-      AtomicInteger lastAcquireFailed = new AtomicInteger(0);
-      lastProcessed.set(0);
-
-      Semaphore allowedTasks = new Semaphore(10);
-      CountDownLatch completedTasks = new CountDownLatch(11);
-      CountDownLatch pendingTasks = new CountDownLatch(11);
-
-      final ExecutorService executorService = 
Executors.newSingleThreadExecutor();
-
-      ThresholdActor<Integer> actor = new ThresholdActor<>(executorService, 
(i) -> {
-         try {
-            pendingTasks.countDown();
-            if (allowedTasks.tryAcquire(1, 200, TimeUnit.MILLISECONDS)) {
-               lastProcessed.set(i);
-            } else {
-               lastAcquireFailed.set(i);
-            }
-            completedTasks.countDown();
-         } catch (InterruptedException ignored) {
-         }
-
-      }, 1000, (e) -> {
-         return 1;
-      }, () -> {
-      }, () -> {
-      });
-
-      // expect allowedTasks tasks to complete
-      for (int i = 1; i < 100; i++) {
-         actor.act(i);
-      }
-      // wait for task processing
-      Assert.assertTrue(pendingTasks.await(4, TimeUnit.SECONDS));
-
-      actor.shutdown(() -> {
-         lastProcessed.set(lastProcessed.get() * 1000);
-      });
-
-      Assert.assertTrue(completedTasks.await(4, TimeUnit.SECONDS));
-
-      // assert processing terminated at block point
-      Assert.assertEquals(10000, lastProcessed.get());
-      // pending task executed as expected
-      Assert.assertEquals(11, lastAcquireFailed.get());
-   }
 }
diff --git 
a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
 
b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
index ee895a51a1..670eca258d 100644
--- 
a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
+++ 
b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
@@ -17,7 +17,6 @@
 package org.apache.activemq.artemis.core.protocol.openwire;
 
 import javax.jms.IllegalStateException;
-import javax.jms.InvalidClientIDException;
 import javax.jms.InvalidDestinationException;
 import javax.jms.JMSSecurityException;
 import javax.transaction.xa.XAException;
@@ -761,7 +760,11 @@ public class OpenWireConnection extends 
AbstractRemotingConnection implements Se
 
       final ThresholdActor<Command> localVisibleActor = openWireActor;
       if (localVisibleActor != null) {
-         localVisibleActor.shutdown(() -> doFail(me, message));
+         localVisibleActor.requestShutdown();
+      }
+
+      if (executor != null) {
+         executor.execute(() -> doFail(me, message));
       } else {
          doFail(me, message);
       }
@@ -779,11 +782,16 @@ public class OpenWireConnection extends 
AbstractRemotingConnection implements Se
       }
       try {
          if (this.getConnectionInfo() != null) {
-            protocolManager.removeConnection(this.getConnectionInfo(), me);
+            protocolManager.removeConnection(getClientID(), this);
          }
-      } catch (InvalidClientIDException e) {
-         logger.warn("Couldn't close connection because invalid clientID", e);
       } finally {
+         try {
+            disconnect(false);
+         } catch (Throwable e) {
+            // it should never happen, but never say never
+            logger.debug("OpenWireConnection::disconnect failure", e);
+         }
+
          // there may be some transactions not associated with sessions
          // deal with them after sessions are removed via connection removal
          operationContext.executeOnCompletion(new IOCallback() {
@@ -876,29 +884,6 @@ public class OpenWireConnection extends 
AbstractRemotingConnection implements Se
       internalSession = 
server.createSession(UUIDGenerator.getInstance().generateStringUUID(), 
context.getUserName(), info.getPassword(), -1, this, true, false, false, false, 
null, null, true, operationContext, protocolManager.getPrefixes(), 
protocolManager.getSecurityDomain(), validatedUser, false);
    }
 
-   //raise the refCount of context
-   public void reconnect(AMQConnectionContext existingContext, ConnectionInfo 
info) throws Exception {
-      this.context = existingContext;
-      WireFormatInfo wireFormatInfo = inWireFormat.getPreferedWireFormatInfo();
-      // Older clients should have been defaulting this field to true.. but
-      // they were not.
-      if (wireFormatInfo != null && wireFormatInfo.getVersion() <= 2) {
-         info.setClientMaster(true);
-      }
-      if (info.getClientIp() == null) {
-         info.setClientIp(getRemoteAddress());
-      }
-      createInternalSession(info);
-      state = new ConnectionState(info);
-      state.reset(info);
-
-      context.setConnection(this);
-      context.setConnectionState(state);
-      context.setClientMaster(info.isClientMaster());
-      context.setFaultTolerant(info.isFaultTolerant());
-      context.setReconnect(true);
-   }
-
    /**
     * This will answer with commands to the client
     */
@@ -1817,13 +1802,13 @@ public class OpenWireConnection extends 
AbstractRemotingConnection implements Se
       @Override
       public Response processRemoveConnection(ConnectionId id, long 
lastDeliveredSequenceId) throws Exception {
          //we let protocol manager to handle connection add/remove
+         for (SessionState sessionState : state.getSessionStates()) {
+            propagateLastSequenceId(sessionState, lastDeliveredSequenceId);
+         }
          try {
-            for (SessionState sessionState : state.getSessionStates()) {
-               propagateLastSequenceId(sessionState, lastDeliveredSequenceId);
-            }
-            protocolManager.removeConnection(state.getInfo(), null);
-         } catch (Throwable e) {
-            // log
+            protocolManager.removeConnection(getClientID(), 
OpenWireConnection.this);
+         } finally {
+            OpenWireConnection.this.disconnect(false);
          }
          return null;
       }
diff --git 
a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java
 
b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java
index f1fd7fb6e1..56ff48552c 100644
--- 
a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java
+++ 
b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java
@@ -109,7 +109,7 @@ public class OpenWireProtocolManager  extends 
AbstractProtocolManager<Command, O
 
    private final CopyOnWriteArrayList<OpenWireConnection> connections = new 
CopyOnWriteArrayList<>();
 
-   private final Map<String, AMQConnectionContext> clientIdSet = new 
ConcurrentHashMap<>();
+   private final ConcurrentHashMap<String, OpenWireConnection> clientIdSet = 
new ConcurrentHashMap<>();
 
    private String brokerName;
 
@@ -244,18 +244,9 @@ public class OpenWireProtocolManager  extends 
AbstractProtocolManager<Command, O
       }
    }
 
-   public void removeConnection(ConnectionInfo info, Throwable error) throws 
InvalidClientIDException {
-      String clientId = info.getClientId();
-      if (clientId != null) {
-         AMQConnectionContext context = this.clientIdSet.remove(clientId);
-         if (context != null) {
-            //connection is still there and need to close
-            context.getConnection().disconnect(error != null);
-            this.connections.remove(context.getConnection());
-         }
-      } else {
-         throw new InvalidClientIDException("No clientID specified for 
connection disconnect request");
-      }
+   public void removeConnection(String clientID, OpenWireConnection 
connection) {
+      clientIdSet.remove(clientID, connection);
+      connections.remove(connection);
    }
 
    /*** if set, the OpenWire connection will bypass the tcpReadBuferSize and 
use this value instead.
@@ -421,22 +412,16 @@ public class OpenWireProtocolManager  extends 
AbstractProtocolManager<Command, O
       }
 
       AMQConnectionContext context;
-      context = clientIdSet.get(clientId);
-      if (context != null) {
-         if (info.isFailoverReconnect()) {
-            OpenWireConnection oldConnection = context.getConnection();
-            oldConnection.disconnect(true);
-            connections.remove(oldConnection);
-            connection.reconnect(context, info);
-         } else {
-            throw new InvalidClientIDException("Broker: " + getBrokerName() + 
" - Client: " + clientId + " already connected from " + 
context.getConnection().getRemoteAddress());
+      OpenWireConnection oldConnection = clientIdSet.get(clientId);
+      if (oldConnection != null) {
+         if (!info.isFailoverReconnect()) {
+            throw new InvalidClientIDException("Broker: " + getBrokerName() + 
" - Client: " + clientId + " already connected from " + 
oldConnection.getRemoteAddress());
          }
-      } else {
-         //new connection
-         context = connection.initContext(info);
-         clientIdSet.put(clientId, context);
       }
 
+      context = connection.initContext(info);
+
+      clientIdSet.put(clientId, connection);
       connections.add(connection);
 
       ActiveMQTopic topic = AdvisorySupport.getConnectionAdvisoryTopic();
diff --git 
a/tests/integration-tests-isolated/src/test/java/org/apache/activemq/artemis/tests/integration/isolated/client/ConnectionDroppedTest.java
 
b/tests/integration-tests-isolated/src/test/java/org/apache/activemq/artemis/tests/integration/isolated/client/ConnectionDroppedTest.java
index 542343fd80..df5e4f2f63 100644
--- 
a/tests/integration-tests-isolated/src/test/java/org/apache/activemq/artemis/tests/integration/isolated/client/ConnectionDroppedTest.java
+++ 
b/tests/integration-tests-isolated/src/test/java/org/apache/activemq/artemis/tests/integration/isolated/client/ConnectionDroppedTest.java
@@ -41,6 +41,8 @@ import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.persistence.OperationContext;
 import org.apache.activemq.artemis.core.server.ActiveMQServer;
 import org.apache.activemq.artemis.core.server.Queue;
+import org.apache.activemq.artemis.core.server.ServerConsumer;
+import 
org.apache.activemq.artemis.core.server.plugin.ActiveMQServerConsumerPlugin;
 import 
org.apache.activemq.artemis.core.server.plugin.ActiveMQServerSessionPlugin;
 import org.apache.activemq.artemis.logs.AssertionLoggerHandler;
 import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
@@ -498,4 +500,60 @@ public class ConnectionDroppedTest extends 
ActiveMQTestBase {
 
    }
 
+
+   @Test(timeout = 10_000)
+   public void testForceDropOpenWire() throws Throwable {
+      ActiveMQServer server = createServer(true, createDefaultConfig(true));
+      server.start();
+
+      Queue serverQueue = server.createQueue(new 
QueueConfiguration("test-queue").setRoutingType(RoutingType.ANYCAST).setAddress("test-queue").setAutoCreated(false));
+
+      CountDownLatch beforeCreateCalled = new CountDownLatch(1);
+      CountDownLatch goCreateConsumer = new CountDownLatch(1);
+      server.registerBrokerPlugin(new ActiveMQServerConsumerPlugin() {
+         @Override
+         public void afterCreateConsumer(ServerConsumer consumer) throws 
ActiveMQException {
+            if (consumer.getQueue() == serverQueue) {
+               logger.info("Creating a consumer at {}", consumer.getQueue());
+               beforeCreateCalled.countDown();
+               try {
+                  goCreateConsumer.await(5, TimeUnit.MINUTES);
+               } catch (Exception e) {
+                  logger.warn(e.getMessage(), e);
+               }
+            }
+         }
+      });
+
+      ExecutorService executorService = Executors.newFixedThreadPool(1);
+      runAfter(executorService::shutdownNow);
+
+      ConnectionFactory factory = CFUtil.createConnectionFactory("OPENWIRE", 
"tcp://localhost:61616");
+
+      CountDownLatch done = new CountDownLatch(1);
+
+      executorService.execute(() -> {
+         try (Connection connection = factory.createConnection();
+              Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+              MessageConsumer consumer = 
session.createConsumer(session.createQueue("test-queue"))) {
+         } catch (Exception e) {
+            logger.warn(e.getMessage(), e);
+         } finally {
+            done.countDown();
+         }
+      });
+
+      Assert.assertTrue(beforeCreateCalled.await(5, TimeUnit.MINUTES));
+
+      server.getRemotingService().getConnections().forEach(r -> {
+         r.fail(new ActiveMQException("this is a simulation"));
+      });
+
+      goCreateConsumer.countDown();
+
+      Wait.assertEquals(0, serverQueue::getConsumerCount);
+   }
+
+
+
 }
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/FastReconnectOpenWireTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/FastReconnectOpenWireTest.java
new file mode 100644
index 0000000000..1a41b32580
--- /dev/null
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/FastReconnectOpenWireTest.java
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.openwire;
+
+import javax.jms.Connection;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.RedeliveryPolicy;
+import org.apache.activemq.artemis.api.core.QueueConfiguration;
+import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
+import org.apache.activemq.artemis.tests.util.Wait;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.transport.tcp.TcpTransport;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class FastReconnectOpenWireTest extends OpenWireTestBase {
+
+   // change this number to give the test a bit more of spinning
+   private static final int NUM_ITERATIONS = 50;
+
+   private static final Logger logger = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+   @Override
+   protected void configureAddressSettings(Map<String, AddressSettings> 
addressSettingsMap) {
+      super.configureAddressSettings(addressSettingsMap);
+      // force send to dlq early
+      addressSettingsMap.put("exampleQueue", new 
AddressSettings().setAutoCreateQueues(false).setAutoCreateAddresses(false).setDeadLetterAddress(new
 
SimpleString("ActiveMQ.DLQ")).setAutoCreateAddresses(true).setMaxDeliveryAttempts(2));
+      // force send to dlq late
+      addressSettingsMap.put("exampleQueueTwo", new 
AddressSettings().setAutoCreateQueues(false).setAutoCreateAddresses(false).setDeadLetterAddress(new
 
SimpleString("ActiveMQ.DLQ")).setAutoCreateAddresses(true).setMaxDeliveryAttempts(-1));
+   }
+
+
+   @Test(timeout = 60_000)
+   public void testFastReconnectCreateConsumerNoErrors() throws Exception {
+
+      final ArrayList<Throwable> errors = new ArrayList<>();
+      SimpleString durableQueue = new SimpleString("exampleQueueTwo");
+      this.server.createQueue(new 
QueueConfiguration(durableQueue).setRoutingType(RoutingType.ANYCAST));
+
+      Queue queue = new ActiveMQQueue(durableQueue.toString());
+
+      final ActiveMQConnectionFactory exFact = new 
ActiveMQConnectionFactory("failover:(tcp://localhost:61616?closeAsync=false)?startupMaxReconnectAttempts=-1&maxReconnectAttempts=-1&timeout=5000");
+      exFact.setWatchTopicAdvisories(false);
+      exFact.setConnectResponseTimeout(10000);
+      exFact.setClientID("myID");
+
+      RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
+      redeliveryPolicy.setRedeliveryDelay(0);
+      redeliveryPolicy.setMaximumRedeliveries(-1);
+      exFact.setRedeliveryPolicy(redeliveryPolicy);
+
+      publish(1000, durableQueue.toString());
+
+      final AtomicInteger numIterations = new AtomicInteger(NUM_ITERATIONS);
+      ExecutorService executor = Executors.newCachedThreadPool();
+      runAfter(executor::shutdownNow);
+
+      final int concurrent = 2;
+      final CountDownLatch done = new CountDownLatch(concurrent);
+      for (int i = 0; i < concurrent; i++) {
+         executor.execute(() -> {
+            try {
+               while (numIterations.decrementAndGet() > 0) {
+                  try (Connection conn = exFact.createConnection(); Session 
consumerConnectionSession = conn.createSession(Session.SESSION_TRANSACTED); 
MessageConsumer messageConsumer = 
consumerConnectionSession.createConsumer(queue)) {
+
+                     messageConsumer.receiveNoWait();
+
+                     try {
+                        // force a local socket close such that the broker 
sees an exception on the connection and fails the consumer via serverConsumer 
close
+                        ((ActiveMQConnection) 
conn).getTransport().narrow(TcpTransport.class).stop();
+                     } catch (Throwable expected) {
+                     }
+
+                  } catch (javax.jms.InvalidClientIDException expected) {
+                     // deliberate clash across concurrent connections
+                  } catch (Throwable unexpected) {
+                     logger.warn(unexpected.getMessage(), unexpected);
+                     errors.add(unexpected);
+                     numIterations.set(0);
+                  }
+               }
+            } finally {
+               done.countDown();
+            }
+         });
+      }
+
+      assertTrue(done.await(30, TimeUnit.SECONDS));
+
+      Wait.assertEquals(0, () -> 
server.locateQueue(durableQueue).getConsumerCount(), 5000);
+      assertTrue(errors.isEmpty());
+
+   }
+
+   @Test(timeout = 60_000)
+   public void testFastReconnectCreateConsumerNoErrorsNoClientId() throws 
Exception {
+
+      final ArrayList<Throwable> errors = new ArrayList<>();
+      SimpleString durableQueue = new SimpleString("exampleQueueTwo");
+      this.server.createQueue(new 
QueueConfiguration(durableQueue).setRoutingType(RoutingType.ANYCAST));
+
+      Queue queue = new ActiveMQQueue(durableQueue.toString());
+
+      final ActiveMQConnectionFactory exFact = new 
ActiveMQConnectionFactory("failover:(tcp://localhost:61616?closeAsync=false)?startupMaxReconnectAttempts=-1&maxReconnectAttempts=-1&timeout=5000");
+      exFact.setWatchTopicAdvisories(false);
+      exFact.setConnectResponseTimeout(10000);
+
+      RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
+      redeliveryPolicy.setRedeliveryDelay(0);
+      redeliveryPolicy.setMaximumRedeliveries(-1);
+      exFact.setRedeliveryPolicy(redeliveryPolicy);
+
+      publish(1000, durableQueue.toString());
+
+      final AtomicInteger numIterations = new AtomicInteger(NUM_ITERATIONS);
+      ExecutorService executor = Executors.newCachedThreadPool();
+      runAfter(executor::shutdownNow);
+
+      final int concurrent = 2;
+      CountDownLatch done = new CountDownLatch(concurrent);
+      for (int i = 0; i < concurrent; i++) {
+         executor.execute(() -> {
+            try (Connection conn = exFact.createConnection();
+                 Session consumerConnectionSession = 
conn.createSession(Session.SESSION_TRANSACTED);
+                 MessageConsumer messageConsumer = 
consumerConnectionSession.createConsumer(queue)
+            ) {
+               conn.start();
+               while (numIterations.decrementAndGet() > 0) {
+                  try {
+                     messageConsumer.receiveNoWait();
+
+                     try {
+                        // force a local socket close such that the broker 
sees an exception on the connection and fails the consumer via serverConsumer 
close
+                        ((ActiveMQConnection) 
conn).getTransport().narrow(TcpTransport.class).stop();
+                     } catch (Throwable expected) {
+                     }
+                  } catch (Throwable unexpected) {
+                     errors.add(unexpected);
+                  }
+               }
+            } catch (Throwable unexpected) {
+               numIterations.set(0);
+               unexpected.printStackTrace();
+               errors.add(unexpected);
+            } finally {
+               done.countDown();
+            }
+         });
+      }
+
+      assertTrue(done.await(30, TimeUnit.SECONDS));
+
+      Wait.assertEquals(0, () -> 
server.locateQueue(durableQueue).getConsumerCount(), 5000);
+
+      assertTrue(errors.isEmpty());
+
+   }
+
+   private void publish(int numMessages, String name) throws Exception {
+      final ActiveMQConnectionFactory exFact = new ActiveMQConnectionFactory();
+      Connection exConn = exFact.createConnection();
+      exConn.start();
+
+      Queue queue = new ActiveMQQueue(name);
+
+      // put a few messages on the queue to have the broker do some dispatch 
work
+      Session session = exConn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+      MessageProducer producer = session.createProducer(queue);
+
+      TextMessage message = session.createTextMessage("This is a text 
message");
+      for (int i = 0; i < numMessages; i++) {
+         message.setIntProperty("SEQ", i);
+         producer.send(message);
+      }
+      session.close();
+      exConn.close();
+
+   }
+}
diff --git 
a/tests/leak-tests/src/test/java/org/apache/activemq/artemis/tests/leak/MemoryAssertions.java
 
b/tests/leak-tests/src/test/java/org/apache/activemq/artemis/tests/leak/MemoryAssertions.java
index dc926e517c..0345e1c7b2 100644
--- 
a/tests/leak-tests/src/test/java/org/apache/activemq/artemis/tests/leak/MemoryAssertions.java
+++ 
b/tests/leak-tests/src/test/java/org/apache/activemq/artemis/tests/leak/MemoryAssertions.java
@@ -47,6 +47,8 @@ public class MemoryAssertions {
       assertMemory(checkLeak, 0, ProtonServerReceiverContext.class.getName());
       assertMemory(checkLeak, 0, 
ActiveMQProtonRemotingConnection.class.getName());
       assertMemory(checkLeak, 0, RemotingConnectionImpl.class.getName());
+      assertMemory(checkLeak, 0, OpenWireConnection.class.getName());
+      assertMemory(checkLeak, 0, 
ActiveMQProtonRemotingConnection.class.getName());
       assertMemory(checkLeak, 0, ServerSessionImpl.class.getName());
       assertMemory(checkLeak, 0, AMQPSessionContext.class.getName());
       assertMemory(checkLeak, 0, ServerConsumerImpl.class.getName());

Reply via email to