This is an automated email from the ASF dual-hosted git repository.

jbertram pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/main by this push:
     new 8422c77c7d ARTEMIS-4173 Improving reliability on RandomReattachTest
8422c77c7d is described below

commit 8422c77c7dd2611a769147864a0f6fcd09caf6f9
Author: Clebert Suconic <[email protected]>
AuthorDate: Thu Dec 5 11:14:05 2024 -0500

    ARTEMIS-4173 Improving reliability on RandomReattachTest
    
    - making the test to run faster and removing the soak and stress versions 
as they are not really needed
    - allowing expected exceptions (cannot find ACK, and making it retry the 
operation)
    - removing duplicates of the test that are not really needed.
---
 .../cluster/reattach/RandomReattachTestBase.java   | 1317 -------------------
 .../reattach/RandomReattachIntegrationTest.java    | 1337 +++++++++++++++++++-
 .../soak/failover/RandomFailoverSoakTest.java      |   33 -
 .../stress/failover/RandomReattachStressTest.java  |   34 -
 4 files changed, 1334 insertions(+), 1387 deletions(-)

diff --git 
a/tests/artemis-test-support/src/main/java/org/apache/activemq/artemis/tests/integration/cluster/reattach/RandomReattachTestBase.java
 
b/tests/artemis-test-support/src/main/java/org/apache/activemq/artemis/tests/integration/cluster/reattach/RandomReattachTestBase.java
deleted file mode 100644
index d58eb97aef..0000000000
--- 
a/tests/artemis-test-support/src/main/java/org/apache/activemq/artemis/tests/integration/cluster/reattach/RandomReattachTestBase.java
+++ /dev/null
@@ -1,1317 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.artemis.tests.integration.cluster.reattach;
-
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertNotNull;
-import static org.junit.jupiter.api.Assertions.assertNull;
-import static org.junit.jupiter.api.Assertions.assertTrue;
-import static org.junit.jupiter.api.Assertions.fail;
-
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.Set;
-import java.util.Timer;
-import java.util.TimerTask;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.activemq.artemis.api.core.ActiveMQException;
-import org.apache.activemq.artemis.api.core.ActiveMQNotConnectedException;
-import org.apache.activemq.artemis.api.core.QueueConfiguration;
-import org.apache.activemq.artemis.api.core.SimpleString;
-import org.apache.activemq.artemis.api.core.client.ClientConsumer;
-import org.apache.activemq.artemis.api.core.client.ClientMessage;
-import org.apache.activemq.artemis.api.core.client.ClientProducer;
-import org.apache.activemq.artemis.api.core.client.ClientSession;
-import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
-import org.apache.activemq.artemis.api.core.client.MessageHandler;
-import org.apache.activemq.artemis.api.core.client.ServerLocator;
-import org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryImpl;
-import org.apache.activemq.artemis.core.client.impl.ClientSessionInternal;
-import org.apache.activemq.artemis.core.remoting.impl.invm.InVMRegistry;
-import org.apache.activemq.artemis.core.server.ActiveMQServer;
-import org.apache.activemq.artemis.core.server.ActiveMQServers;
-import org.apache.activemq.artemis.jms.client.ActiveMQTextMessage;
-import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
-import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import java.lang.invoke.MethodHandles;
-
-public abstract class RandomReattachTestBase extends ActiveMQTestBase {
-
-   private static final Logger logger = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
-
-
-   private static final int RECEIVE_TIMEOUT = 10000;
-
-
-   private static final SimpleString ADDRESS = 
SimpleString.of("FailoverTestAddress");
-
-   private ActiveMQServer server;
-
-   private Timer timer;
-
-   protected abstract int getNumIterations();
-
-   @Test
-   public void testA() throws Exception {
-      runTest(new RunnableT() {
-         @Override
-         public void run(final ClientSessionFactory sf) throws Exception {
-            doTestA(sf);
-         }
-      });
-   }
-
-   @Test
-   public void testB() throws Exception {
-      runTest(new RunnableT() {
-         @Override
-         public void run(final ClientSessionFactory sf) throws Exception {
-            doTestB(sf);
-         }
-      });
-   }
-
-   @Test
-   public void testC() throws Exception {
-      runTest(new RunnableT() {
-         @Override
-         public void run(final ClientSessionFactory sf) throws Exception {
-            doTestC(sf);
-         }
-      });
-   }
-
-   @Test
-   public void testD() throws Exception {
-      runTest(new RunnableT() {
-         @Override
-         public void run(final ClientSessionFactory sf) throws Exception {
-            doTestD(sf);
-         }
-      });
-   }
-
-   @Test
-   public void testE() throws Exception {
-      runTest(new RunnableT() {
-         @Override
-         public void run(final ClientSessionFactory sf) throws Exception {
-            doTestE(sf);
-         }
-      });
-   }
-
-   @Test
-   public void testF() throws Exception {
-      runTest(new RunnableT() {
-         @Override
-         public void run(final ClientSessionFactory sf) throws Exception {
-            doTestF(sf);
-         }
-      });
-   }
-
-   @Test
-   public void testG() throws Exception {
-      runTest(new RunnableT() {
-         @Override
-         public void run(final ClientSessionFactory sf) throws Exception {
-            doTestG(sf);
-         }
-      });
-   }
-
-   @Test
-   public void testH() throws Exception {
-      runTest(new RunnableT() {
-         @Override
-         public void run(final ClientSessionFactory sf) throws Exception {
-            doTestH(sf);
-         }
-      });
-   }
-
-   @Test
-   public void testI() throws Exception {
-      runTest(new RunnableT() {
-         @Override
-         public void run(final ClientSessionFactory sf) throws Exception {
-            doTestI(sf);
-         }
-      });
-   }
-
-   @Test
-   public void testJ() throws Exception {
-      runTest(new RunnableT() {
-         @Override
-         public void run(final ClientSessionFactory sf) throws Exception {
-            doTestJ(sf);
-         }
-      });
-   }
-
-   @Test
-   public void testK() throws Exception {
-      runTest(new RunnableT() {
-         @Override
-         public void run(final ClientSessionFactory sf) throws Exception {
-            doTestK(sf);
-         }
-      });
-   }
-
-   @Test
-   public void testL() throws Exception {
-      runTest(new RunnableT() {
-         @Override
-         public void run(final ClientSessionFactory sf) throws Exception {
-            doTestL(sf);
-         }
-      });
-   }
-
-   @Test
-   public void testN() throws Exception {
-      runTest(new RunnableT() {
-         @Override
-         public void run(final ClientSessionFactory sf) throws Exception {
-            doTestN(sf);
-         }
-      });
-   }
-
-   public void runTest(final RunnableT runnable) throws Exception {
-      final int numIts = getNumIterations();
-
-      for (int its = 0; its < numIts; its++) {
-         logger.debug("####{} iteration #{}", getName(), its);
-         start();
-         ServerLocator locator = 
createInVMNonHALocator().setReconnectAttempts(15).setConfirmationWindowSize(1024
 * 1024);
-
-         ClientSessionFactory sf = createSessionFactory(locator);
-
-         ClientSession session = sf.createSession(false, false, false);
-
-         Failer failer = startFailer(1000, session);
-
-         do {
-            runnable.run(sf);
-         }
-         while (!failer.isExecuted());
-      }
-   }
-
-
-
-   protected void doTestA(final ClientSessionFactory sf) throws Exception {
-      long start = System.currentTimeMillis();
-
-      ClientSession s = sf.createSession(false, false, false);
-
-      final int numMessages = 100;
-
-      final int numSessions = 10;
-
-      Set<ClientConsumer> consumers = new HashSet<>();
-      Set<ClientSession> sessions = new HashSet<>();
-
-      for (int i = 0; i < numSessions; i++) {
-         SimpleString subName = SimpleString.of("sub" + i);
-
-         ClientSession sessConsume = sf.createSession(false, true, true);
-
-         sessConsume.start();
-
-         
sessConsume.createQueue(QueueConfiguration.of(subName).setAddress(ADDRESS).setDurable(false));
-
-         ClientConsumer consumer = sessConsume.createConsumer(subName);
-
-         consumers.add(consumer);
-
-         sessions.add(sessConsume);
-      }
-
-      ClientSession sessSend = sf.createSession(false, true, true);
-
-      ClientProducer producer = sessSend.createProducer(ADDRESS);
-
-      for (int i = 0; i < numMessages; i++) {
-         ClientMessage message = 
sessSend.createMessage(ActiveMQTextMessage.TYPE, false, 0, 
System.currentTimeMillis(), (byte) 1);
-         message.putIntProperty(SimpleString.of("count"), i);
-         producer.send(message);
-      }
-
-      class MyHandler extends AssertionCheckMessageHandler {
-
-         final CountDownLatch latch = new CountDownLatch(1);
-
-         int count;
-
-         @Override
-         public void onMessageAssert(final ClientMessage message) {
-            if (count == numMessages) {
-               fail("Too many messages");
-            }
-
-            assertEquals(count, 
message.getObjectProperty(SimpleString.of("count")));
-
-            count++;
-
-            try {
-               message.acknowledge();
-            } catch (ActiveMQException me) {
-               logger.error("Failed to process", me);
-            }
-
-            if (count == numMessages) {
-               latch.countDown();
-            }
-         }
-      }
-
-      Set<MyHandler> handlers = new HashSet<>();
-
-      for (ClientConsumer consumer : consumers) {
-         MyHandler handler = new MyHandler();
-
-         consumer.setMessageHandler(handler);
-
-         handlers.add(handler);
-      }
-
-      for (MyHandler handler : handlers) {
-         boolean ok = handler.latch.await(5000, TimeUnit.MILLISECONDS);
-
-         handler.checkAssertions();
-
-         assertTrue(ok, "Didn't receive all messages");
-      }
-
-      sessSend.close();
-      for (ClientSession session : sessions) {
-         session.close();
-      }
-
-      for (int i = 0; i < numSessions; i++) {
-         SimpleString subName = SimpleString.of("sub" + i);
-
-         s.deleteQueue(subName);
-      }
-
-      s.close();
-
-      long end = System.currentTimeMillis();
-
-      logger.debug("duration {}", (end - start));
-   }
-
-   protected void doTestB(final ClientSessionFactory sf) throws Exception {
-      long start = System.currentTimeMillis();
-
-      ClientSession s = sf.createSession(false, false, false);
-
-      final int numMessages = 100;
-
-      final int numSessions = 50;
-
-      Set<ClientConsumer> consumers = new HashSet<>();
-      Set<ClientSession> sessions = new HashSet<>();
-
-      for (int i = 0; i < numSessions; i++) {
-         SimpleString subName = SimpleString.of("sub" + i);
-
-         ClientSession sessConsume = sf.createSession(false, true, true);
-
-         
sessConsume.createQueue(QueueConfiguration.of(subName).setAddress(ADDRESS).setDurable(false));
-
-         ClientConsumer consumer = sessConsume.createConsumer(subName);
-
-         consumers.add(consumer);
-
-         sessions.add(sessConsume);
-      }
-
-      ClientSession sessSend = sf.createSession(false, true, true);
-
-      ClientProducer producer = sessSend.createProducer(ADDRESS);
-
-      for (int i = 0; i < numMessages; i++) {
-         ClientMessage message = 
sessSend.createMessage(ActiveMQTextMessage.TYPE, false, 0, 
System.currentTimeMillis(), (byte) 1);
-         message.putIntProperty(SimpleString.of("count"), i);
-         producer.send(message);
-      }
-
-      for (ClientSession session : sessions) {
-         session.start();
-      }
-
-      class MyHandler extends AssertionCheckMessageHandler {
-
-         final CountDownLatch latch = new CountDownLatch(1);
-
-         int count;
-
-         @Override
-         public void onMessageAssert(final ClientMessage message) {
-            if (count == numMessages) {
-               fail("Too many messages");
-            }
-
-            assertEquals(count, 
message.getObjectProperty(SimpleString.of("count")));
-
-            count++;
-
-            if (count == numMessages) {
-               latch.countDown();
-            }
-         }
-      }
-
-      Set<MyHandler> handlers = new HashSet<>();
-
-      for (ClientConsumer consumer : consumers) {
-         MyHandler handler = new MyHandler();
-
-         consumer.setMessageHandler(handler);
-
-         handlers.add(handler);
-      }
-
-      for (MyHandler handler : handlers) {
-         boolean ok = handler.latch.await(10000, TimeUnit.MILLISECONDS);
-
-         handler.checkAssertions();
-
-         assertTrue(ok);
-      }
-
-      sessSend.close();
-
-      for (ClientSession session : sessions) {
-         session.close();
-      }
-
-      for (int i = 0; i < numSessions; i++) {
-         SimpleString subName = SimpleString.of("sub" + i);
-
-         s.deleteQueue(subName);
-      }
-
-      s.close();
-
-      long end = System.currentTimeMillis();
-
-      logger.debug("duration {}", (end - start));
-
-   }
-
-   protected void doTestC(final ClientSessionFactory sf) throws Exception {
-      long start = System.currentTimeMillis();
-
-      ClientSession s = sf.createSession(false, false, false);
-
-      final int numMessages = 100;
-
-      final int numSessions = 1;
-
-      Set<ClientConsumer> consumers = new HashSet<>();
-      Set<ClientSession> sessions = new HashSet<>();
-
-      for (int i = 0; i < numSessions; i++) {
-         SimpleString subName = SimpleString.of("sub" + i);
-
-         ClientSession sessConsume = sf.createSession(false, false, false);
-
-         sessConsume.start();
-
-         
sessConsume.createQueue(QueueConfiguration.of(subName).setAddress(ADDRESS).setDurable(false));
-
-         ClientConsumer consumer = sessConsume.createConsumer(subName);
-
-         consumers.add(consumer);
-
-         sessions.add(sessConsume);
-      }
-
-      ClientSession sessSend = sf.createSession(false, false, true);
-
-      ClientProducer producer = sessSend.createProducer(ADDRESS);
-
-      for (int i = 0; i < numMessages; i++) {
-         ClientMessage message = 
sessSend.createMessage(ActiveMQTextMessage.TYPE, false, 0, 
System.currentTimeMillis(), (byte) 1);
-         message.putIntProperty(SimpleString.of("count"), i);
-         producer.send(message);
-      }
-
-      sessSend.rollback();
-
-      for (int i = 0; i < numMessages; i++) {
-         ClientMessage message = 
sessSend.createMessage(ActiveMQTextMessage.TYPE, false, 0, 
System.currentTimeMillis(), (byte) 1);
-         message.putIntProperty(SimpleString.of("count"), i);
-         producer.send(message);
-      }
-
-      sessSend.commit();
-
-      class MyHandler extends AssertionCheckMessageHandler {
-
-         final CountDownLatch latch = new CountDownLatch(1);
-
-         int count;
-
-         @Override
-         public void onMessageAssert(final ClientMessage message) {
-            if (count == numMessages) {
-               fail("Too many messages, expected " + count);
-            }
-
-            assertEquals(count, 
message.getObjectProperty(SimpleString.of("count")));
-
-            count++;
-
-            try {
-               message.acknowledge();
-            } catch (ActiveMQException e) {
-               e.printStackTrace();
-               throw new RuntimeException(e.getMessage(), e);
-            }
-
-            if (count == numMessages) {
-               latch.countDown();
-            }
-         }
-      }
-
-      Set<MyHandler> handlers = new HashSet<>();
-
-      for (ClientConsumer consumer : consumers) {
-         MyHandler handler = new MyHandler();
-
-         consumer.setMessageHandler(handler);
-
-         handlers.add(handler);
-      }
-
-      for (MyHandler handler : handlers) {
-         boolean ok = handler.latch.await(10000, TimeUnit.MILLISECONDS);
-
-         assertTrue(ok);
-
-         handler.checkAssertions();
-      }
-
-      handlers.clear();
-
-      // New handlers
-      for (ClientConsumer consumer : consumers) {
-         MyHandler handler = new MyHandler();
-
-         consumer.setMessageHandler(handler);
-
-         handlers.add(handler);
-      }
-
-      for (ClientSession session : sessions) {
-         session.rollback();
-      }
-
-      for (MyHandler handler : handlers) {
-         boolean ok = handler.latch.await(10000, TimeUnit.MILLISECONDS);
-
-         assertTrue(ok);
-
-         handler.checkAssertions();
-      }
-
-      for (ClientSession session : sessions) {
-         session.commit();
-      }
-
-      sessSend.close();
-      for (ClientSession session : sessions) {
-         session.close();
-      }
-
-      for (int i = 0; i < numSessions; i++) {
-         SimpleString subName = SimpleString.of("sub" + i);
-
-         s.deleteQueue(subName);
-      }
-
-      s.close();
-
-      long end = System.currentTimeMillis();
-
-      logger.debug("duration {}", (end - start));
-   }
-
-   protected void doTestD(final ClientSessionFactory sf) throws Exception {
-      long start = System.currentTimeMillis();
-
-      ClientSession s = sf.createSession(false, false, false);
-
-      final int numMessages = 100;
-
-      final int numSessions = 10;
-
-      Set<ClientConsumer> consumers = new HashSet<>();
-      Set<ClientSession> sessions = new HashSet<>();
-
-      for (int i = 0; i < numSessions; i++) {
-         SimpleString subName = SimpleString.of("sub" + i);
-
-         ClientSession sessConsume = sf.createSession(false, false, false);
-
-         
sessConsume.createQueue(QueueConfiguration.of(subName).setAddress(ADDRESS).setDurable(false));
-
-         ClientConsumer consumer = sessConsume.createConsumer(subName);
-
-         consumers.add(consumer);
-
-         sessions.add(sessConsume);
-      }
-
-      ClientSession sessSend = sf.createSession(false, false, true);
-
-      ClientProducer producer = sessSend.createProducer(ADDRESS);
-
-      for (int i = 0; i < numMessages; i++) {
-         ClientMessage message = 
sessSend.createMessage(ActiveMQTextMessage.TYPE, false, 0, 
System.currentTimeMillis(), (byte) 1);
-         message.putIntProperty(SimpleString.of("count"), i);
-         producer.send(message);
-      }
-
-      sessSend.rollback();
-
-      for (int i = 0; i < numMessages; i++) {
-         ClientMessage message = 
sessSend.createMessage(ActiveMQTextMessage.TYPE, false, 0, 
System.currentTimeMillis(), (byte) 1);
-         message.putIntProperty(SimpleString.of("count"), i);
-         producer.send(message);
-      }
-
-      sessSend.commit();
-
-      for (ClientSession session : sessions) {
-         session.start();
-      }
-
-      class MyHandler extends AssertionCheckMessageHandler {
-
-         final CountDownLatch latch = new CountDownLatch(1);
-
-         int count;
-
-         @Override
-         public void onMessageAssert(final ClientMessage message) {
-            if (count == numMessages) {
-               fail("Too many messages, " + count);
-            }
-
-            assertEquals(count, 
message.getObjectProperty(SimpleString.of("count")));
-
-            count++;
-
-            if (count == numMessages) {
-               latch.countDown();
-            }
-         }
-      }
-
-      Set<MyHandler> handlers = new HashSet<>();
-
-      for (ClientConsumer consumer : consumers) {
-         MyHandler handler = new MyHandler();
-
-         consumer.setMessageHandler(handler);
-
-         handlers.add(handler);
-      }
-
-      for (MyHandler handler : handlers) {
-         boolean ok = handler.latch.await(20000, TimeUnit.MILLISECONDS);
-
-         assertTrue(ok);
-
-         handler.checkAssertions();
-      }
-
-      handlers.clear();
-
-      // New handlers
-      for (ClientConsumer consumer : consumers) {
-         MyHandler handler = new MyHandler();
-
-         consumer.setMessageHandler(handler);
-
-         handlers.add(handler);
-      }
-
-      for (ClientSession session : sessions) {
-         session.rollback();
-      }
-
-      for (MyHandler handler : handlers) {
-         boolean ok = handler.latch.await(10000, TimeUnit.MILLISECONDS);
-
-         assertTrue(ok);
-
-         handler.checkAssertions();
-      }
-
-      for (ClientSession session : sessions) {
-         session.commit();
-      }
-
-      sessSend.close();
-      for (ClientSession session : sessions) {
-         session.close();
-      }
-
-      for (int i = 0; i < numSessions; i++) {
-         SimpleString subName = SimpleString.of("sub" + i);
-
-         s.deleteQueue(subName);
-      }
-
-      s.close();
-
-      long end = System.currentTimeMillis();
-
-      logger.debug("duration {}", (end - start));
-   }
-
-   // Now with synchronous receive()
-
-   protected void doTestE(final ClientSessionFactory sf) throws Exception {
-      long start = System.currentTimeMillis();
-
-      ClientSession s = sf.createSession(false, false, false);
-
-      final int numMessages = 100;
-
-      final int numSessions = 10;
-
-      Set<ClientConsumer> consumers = new HashSet<>();
-      Set<ClientSession> sessions = new HashSet<>();
-
-      for (int i = 0; i < numSessions; i++) {
-         SimpleString subName = SimpleString.of("sub" + i);
-
-         ClientSession sessConsume = sf.createSession(false, true, true);
-
-         sessConsume.start();
-
-         
sessConsume.createQueue(QueueConfiguration.of(subName).setAddress(ADDRESS).setDurable(false));
-
-         ClientConsumer consumer = sessConsume.createConsumer(subName);
-
-         consumers.add(consumer);
-
-         sessions.add(sessConsume);
-      }
-
-      ClientSession sessSend = sf.createSession(false, true, true);
-
-      ClientProducer producer = sessSend.createProducer(ADDRESS);
-
-      for (int i = 0; i < numMessages; i++) {
-         ClientMessage message = 
sessSend.createMessage(ActiveMQTextMessage.TYPE, false, 0, 
System.currentTimeMillis(), (byte) 1);
-         message.putIntProperty(SimpleString.of("count"), i);
-         producer.send(message);
-      }
-
-      for (int i = 0; i < numMessages; i++) {
-         for (ClientConsumer consumer : consumers) {
-            ClientMessage msg = consumer.receive(RECEIVE_TIMEOUT);
-
-            assertNotNull(msg);
-
-            assertEquals(i, msg.getObjectProperty(SimpleString.of("count")));
-
-            msg.acknowledge();
-         }
-      }
-
-      for (int i = 0; i < numMessages; i++) {
-         for (ClientConsumer consumer : consumers) {
-            ClientMessage msg = consumer.receiveImmediate();
-
-            assertNull(msg);
-         }
-      }
-
-      sessSend.close();
-      for (ClientSession session : sessions) {
-         session.close();
-      }
-
-      for (int i = 0; i < numSessions; i++) {
-         SimpleString subName = SimpleString.of("sub" + i);
-
-         s.deleteQueue(subName);
-      }
-
-      s.close();
-
-      long end = System.currentTimeMillis();
-
-      logger.debug("duration {}", (end - start));
-   }
-
-   protected void doTestF(final ClientSessionFactory sf) throws Exception {
-      long start = System.currentTimeMillis();
-
-      ClientSession s = sf.createSession(false, false, false);
-
-      final int numMessages = 100;
-
-      final int numSessions = 10;
-
-      Set<ClientConsumer> consumers = new HashSet<>();
-      Set<ClientSession> sessions = new HashSet<>();
-
-      for (int i = 0; i < numSessions; i++) {
-         SimpleString subName = SimpleString.of("sub" + i);
-
-         ClientSession sessConsume = sf.createSession(false, true, true);
-
-         
sessConsume.createQueue(QueueConfiguration.of(subName).setAddress(ADDRESS).setDurable(false));
-
-         ClientConsumer consumer = sessConsume.createConsumer(subName);
-
-         consumers.add(consumer);
-
-         sessions.add(sessConsume);
-      }
-
-      ClientSession sessSend = sf.createSession(false, true, true);
-
-      ClientProducer producer = sessSend.createProducer(ADDRESS);
-
-      for (int i = 0; i < numMessages; i++) {
-         ClientMessage message = 
sessSend.createMessage(ActiveMQTextMessage.TYPE, false, 0, 
System.currentTimeMillis(), (byte) 1);
-         message.putIntProperty(SimpleString.of("count"), i);
-         producer.send(message);
-      }
-
-      for (ClientSession session : sessions) {
-         session.start();
-      }
-
-      for (int i = 0; i < numMessages; i++) {
-         for (ClientConsumer consumer : consumers) {
-            ClientMessage msg = consumer.receive(RECEIVE_TIMEOUT);
-
-            if (msg == null) {
-               throw new IllegalStateException("Failed to receive message " + 
i);
-            }
-
-            assertNotNull(msg);
-
-            assertEquals(i, msg.getObjectProperty(SimpleString.of("count")));
-
-            msg.acknowledge();
-         }
-      }
-
-      for (int i = 0; i < numMessages; i++) {
-         for (ClientConsumer consumer : consumers) {
-            ClientMessage msg = consumer.receiveImmediate();
-
-            assertNull(msg);
-         }
-      }
-
-      sessSend.close();
-      for (ClientSession session : sessions) {
-         session.close();
-      }
-
-      for (int i = 0; i < numSessions; i++) {
-         SimpleString subName = SimpleString.of("sub" + i);
-
-         s.deleteQueue(subName);
-      }
-
-      s.close();
-
-      assertEquals(1, ((ClientSessionFactoryImpl) sf).numSessions());
-
-      long end = System.currentTimeMillis();
-
-      logger.debug("duration {}", (end - start));
-   }
-
-   protected void doTestG(final ClientSessionFactory sf) throws Exception {
-      long start = System.currentTimeMillis();
-
-      ClientSession s = sf.createSession(false, false, false);
-
-      final int numMessages = 100;
-
-      final int numSessions = 10;
-
-      Set<ClientConsumer> consumers = new HashSet<>();
-      Set<ClientSession> sessions = new HashSet<>();
-
-      for (int i = 0; i < numSessions; i++) {
-         SimpleString subName = SimpleString.of("sub" + i);
-
-         ClientSession sessConsume = sf.createSession(false, false, false);
-
-         sessConsume.start();
-
-         
sessConsume.createQueue(QueueConfiguration.of(subName).setAddress(ADDRESS).setDurable(false));
-
-         ClientConsumer consumer = sessConsume.createConsumer(subName);
-
-         consumers.add(consumer);
-
-         sessions.add(sessConsume);
-      }
-
-      ClientSession sessSend = sf.createSession(false, false, false);
-
-      ClientProducer producer = sessSend.createProducer(ADDRESS);
-
-      for (int i = 0; i < numMessages; i++) {
-         ClientMessage message = 
sessSend.createMessage(ActiveMQTextMessage.TYPE, false, 0, 
System.currentTimeMillis(), (byte) 1);
-         message.putIntProperty(SimpleString.of("count"), i);
-         producer.send(message);
-      }
-
-      sessSend.rollback();
-
-      for (int i = 0; i < numMessages; i++) {
-         ClientMessage message = 
sessSend.createMessage(ActiveMQTextMessage.TYPE, false, 0, 
System.currentTimeMillis(), (byte) 1);
-         message.putIntProperty(SimpleString.of("count"), i);
-         producer.send(message);
-      }
-
-      sessSend.commit();
-
-      for (int i = 0; i < numMessages; i++) {
-         for (ClientConsumer consumer : consumers) {
-            ClientMessage msg = consumer.receive(RECEIVE_TIMEOUT);
-
-            assertNotNull(msg);
-
-            assertEquals(i, msg.getObjectProperty(SimpleString.of("count")));
-
-            msg.acknowledge();
-         }
-      }
-
-      for (ClientConsumer consumer : consumers) {
-         ClientMessage msg = consumer.receiveImmediate();
-
-         assertNull(msg);
-      }
-
-      for (ClientSession session : sessions) {
-         session.rollback();
-      }
-
-      for (int i = 0; i < numMessages; i++) {
-         for (ClientConsumer consumer : consumers) {
-            ClientMessage msg = consumer.receive(RECEIVE_TIMEOUT);
-
-            assertNotNull(msg);
-
-            assertEquals(i, msg.getObjectProperty(SimpleString.of("count")));
-
-            msg.acknowledge();
-         }
-      }
-
-      for (int i = 0; i < numMessages; i++) {
-         for (ClientConsumer consumer : consumers) {
-            ClientMessage msg = consumer.receiveImmediate();
-
-            assertNull(msg);
-         }
-      }
-
-      for (ClientSession session : sessions) {
-         session.commit();
-      }
-
-      sessSend.close();
-      for (ClientSession session : sessions) {
-         session.close();
-      }
-
-      for (int i = 0; i < numSessions; i++) {
-         SimpleString subName = SimpleString.of("sub" + i);
-
-         s.deleteQueue(subName);
-      }
-
-      s.close();
-
-      long end = System.currentTimeMillis();
-
-      logger.debug("duration {}", (end - start));
-   }
-
-   protected void doTestH(final ClientSessionFactory sf) throws Exception {
-      long start = System.currentTimeMillis();
-
-      ClientSession s = sf.createSession(false, false, false);
-
-      final int numMessages = 100;
-
-      final int numSessions = 10;
-
-      Set<ClientConsumer> consumers = new HashSet<>();
-      Set<ClientSession> sessions = new HashSet<>();
-
-      for (int i = 0; i < numSessions; i++) {
-         SimpleString subName = SimpleString.of("sub" + i);
-
-         ClientSession sessConsume = sf.createSession(false, false, false);
-
-         
sessConsume.createQueue(QueueConfiguration.of(subName).setAddress(ADDRESS).setDurable(false));
-
-         ClientConsumer consumer = sessConsume.createConsumer(subName);
-
-         consumers.add(consumer);
-
-         sessions.add(sessConsume);
-      }
-
-      ClientSession sessSend = sf.createSession(false, false, false);
-
-      ClientProducer producer = sessSend.createProducer(ADDRESS);
-
-      for (int i = 0; i < numMessages; i++) {
-         ClientMessage message = 
sessSend.createMessage(ActiveMQTextMessage.TYPE, false, 0, 
System.currentTimeMillis(), (byte) 1);
-         message.putIntProperty(SimpleString.of("count"), i);
-         producer.send(message);
-      }
-
-      sessSend.rollback();
-
-      for (int i = 0; i < numMessages; i++) {
-         ClientMessage message = 
sessSend.createMessage(ActiveMQTextMessage.TYPE, false, 0, 
System.currentTimeMillis(), (byte) 1);
-         message.putIntProperty(SimpleString.of("count"), i);
-         producer.send(message);
-      }
-
-      sessSend.commit();
-
-      for (ClientSession session : sessions) {
-         session.start();
-      }
-
-      for (int i = 0; i < numMessages; i++) {
-         for (ClientConsumer consumer : consumers) {
-            ClientMessage msg = consumer.receive(RECEIVE_TIMEOUT);
-
-            assertNotNull(msg);
-
-            assertEquals(i, msg.getObjectProperty(SimpleString.of("count")));
-
-            msg.acknowledge();
-         }
-      }
-
-      for (int i = 0; i < numMessages; i++) {
-         for (ClientConsumer consumer : consumers) {
-            ClientMessage msg = consumer.receiveImmediate();
-
-            assertNull(msg);
-         }
-      }
-
-      for (ClientSession session : sessions) {
-         session.rollback();
-      }
-
-      for (int i = 0; i < numMessages; i++) {
-         for (ClientConsumer consumer : consumers) {
-            ClientMessage msg = consumer.receive(RECEIVE_TIMEOUT);
-
-            assertNotNull(msg);
-
-            assertEquals(i, msg.getObjectProperty(SimpleString.of("count")));
-
-            msg.acknowledge();
-         }
-      }
-
-      for (int i = 0; i < numMessages; i++) {
-         for (ClientConsumer consumer : consumers) {
-            ClientMessage msg = consumer.receiveImmediate();
-
-            assertNull(msg);
-         }
-      }
-
-      for (ClientSession session : sessions) {
-         session.commit();
-      }
-
-      sessSend.close();
-      for (ClientSession session : sessions) {
-         session.close();
-      }
-
-      for (int i = 0; i < numSessions; i++) {
-         SimpleString subName = SimpleString.of("sub" + i);
-
-         s.deleteQueue(subName);
-      }
-
-      s.close();
-
-      long end = System.currentTimeMillis();
-
-      logger.debug("duration {}", (end - start));
-   }
-
-   protected void doTestI(final ClientSessionFactory sf) throws Exception {
-      ClientSession sessCreate = sf.createSession(false, true, true);
-
-      sessCreate.createQueue(QueueConfiguration.of(ADDRESS).setDurable(false));
-
-      ClientSession sess = sf.createSession(false, true, true);
-
-      sess.start();
-
-      ClientConsumer consumer = sess.createConsumer(ADDRESS);
-
-      ClientProducer producer = sess.createProducer(ADDRESS);
-
-      ClientMessage message = sess.createMessage(ActiveMQTextMessage.TYPE, 
false, 0, System.currentTimeMillis(), (byte) 1);
-      producer.send(message);
-
-      ClientMessage message2 = consumer.receive(RECEIVE_TIMEOUT);
-
-      assertNotNull(message2);
-
-      message2.acknowledge();
-
-      sess.close();
-
-      sessCreate.deleteQueue(ADDRESS);
-
-      sessCreate.close();
-   }
-
-   protected void doTestJ(final ClientSessionFactory sf) throws Exception {
-      ClientSession sessCreate = sf.createSession(false, true, true);
-
-      sessCreate.createQueue(QueueConfiguration.of(ADDRESS).setDurable(false));
-
-      ClientSession sess = sf.createSession(false, true, true);
-
-      sess.start();
-
-      ClientConsumer consumer = sess.createConsumer(ADDRESS);
-
-      ClientProducer producer = sess.createProducer(ADDRESS);
-
-      ClientMessage message = sess.createMessage(ActiveMQTextMessage.TYPE, 
false, 0, System.currentTimeMillis(), (byte) 1);
-      producer.send(message);
-
-      ClientMessage message2 = consumer.receive(RECEIVE_TIMEOUT);
-
-      assertNotNull(message2);
-
-      message2.acknowledge();
-
-      sess.close();
-
-      sessCreate.deleteQueue(ADDRESS);
-
-      sessCreate.close();
-   }
-
-   protected void doTestK(final ClientSessionFactory sf) throws Exception {
-      ClientSession s = sf.createSession(false, false, false);
-
-      s.createQueue(QueueConfiguration.of(ADDRESS).setDurable(false));
-
-      final int numConsumers = 100;
-
-      for (int i = 0; i < numConsumers; i++) {
-         ClientConsumer consumer = s.createConsumer(ADDRESS);
-
-         consumer.close();
-      }
-
-      s.deleteQueue(ADDRESS);
-
-      s.close();
-   }
-
-   protected void doTestL(final ClientSessionFactory sf) throws Exception {
-      final int numSessions = 10;
-
-      for (int i = 0; i < numSessions; i++) {
-         ClientSession session = sf.createSession(false, false, false);
-
-         session.close();
-      }
-   }
-
-   protected void doTestN(final ClientSessionFactory sf) throws Exception {
-      ClientSession sessCreate = sf.createSession(false, true, true);
-
-      sessCreate.createQueue(QueueConfiguration.of(ADDRESS).setDurable(false));
-
-      ClientSession sess = sf.createSession(false, true, true);
-
-      sess.stop();
-
-      sess.start();
-
-      sess.stop();
-
-      ClientConsumer consumer = sess.createConsumer(ADDRESS);
-
-      ClientProducer producer = sess.createProducer(ADDRESS);
-
-      ClientMessage message = sess.createMessage(ActiveMQTextMessage.TYPE, 
false, 0, System.currentTimeMillis(), (byte) 1);
-      producer.send(message);
-
-      sess.start();
-
-      ClientMessage message2 = consumer.receive(RECEIVE_TIMEOUT);
-
-      assertNotNull(message2);
-
-      message2.acknowledge();
-
-      sess.stop();
-
-      sess.start();
-
-      sess.close();
-
-      sessCreate.deleteQueue(SimpleString.of(ADDRESS.toString()));
-
-      sessCreate.close();
-   }
-
-   @Override
-   @BeforeEach
-   public void setUp() throws Exception {
-      super.setUp();
-
-      timer = new Timer(true);
-   }
-
-   @Override
-   @AfterEach
-   public void tearDown() throws Exception {
-      timer.cancel();
-
-      super.tearDown();
-   }
-
-
-   private Failer startFailer(final long time, final ClientSession session) {
-      Failer failer = new Failer((ClientSessionInternal) session);
-
-      timer.schedule(failer, (long) (time * Math.random()), 100);
-
-      return failer;
-   }
-
-   private void start() throws Exception {
-      server = 
addServer(ActiveMQServers.newActiveMQServer(createDefaultInVMConfig(), false));
-      server.start();
-   }
-
-   private void stop() throws Exception {
-      server.stop();
-
-      assertEquals(0, InVMRegistry.instance.size());
-
-      server = null;
-   }
-
-
-   class Failer extends TimerTask {
-
-      private final ClientSessionInternal session;
-
-      private boolean executed;
-
-      Failer(final ClientSessionInternal session) {
-         this.session = session;
-      }
-
-      @Override
-      public synchronized void run() {
-         logger.debug("** Failing connection");
-
-         session.getConnection().fail(new 
ActiveMQNotConnectedException("oops"));
-
-         logger.debug("** Fail complete");
-
-         cancel();
-
-         executed = true;
-      }
-
-      public synchronized boolean isExecuted() {
-         return executed;
-      }
-   }
-
-   public abstract class RunnableT {
-
-      abstract void run(ClientSessionFactory sf) throws Exception;
-   }
-
-   abstract static class AssertionCheckMessageHandler implements 
MessageHandler {
-
-      public void checkAssertions() {
-         for (AssertionError e : errors) {
-            // it will throw the first error
-            throw e;
-         }
-      }
-
-      private final ArrayList<AssertionError> errors = new ArrayList<>();
-
-      /* (non-Javadoc)
-       * @see MessageHandler#onMessage(ClientMessage)
-       */
-      @Override
-      public void onMessage(ClientMessage message) {
-         try {
-            onMessageAssert(message);
-         } catch (AssertionError e) {
-            e.printStackTrace(); // System.out -> junit reports
-            errors.add(e);
-         }
-      }
-
-      public abstract void onMessageAssert(ClientMessage message);
-   }
-}
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/reattach/RandomReattachIntegrationTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/reattach/RandomReattachIntegrationTest.java
index a466f43a39..ac86900ba3 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/reattach/RandomReattachIntegrationTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/reattach/RandomReattachIntegrationTest.java
@@ -16,10 +16,1341 @@
  */
 package org.apache.activemq.artemis.tests.integration.cluster.reattach;
 
-public class RandomReattachIntegrationTest extends RandomReattachTestBase {
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
 
-   @Override
-   protected int getNumIterations() {
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.activemq.artemis.api.core.ActiveMQException;
+import org.apache.activemq.artemis.api.core.ActiveMQIllegalStateException;
+import org.apache.activemq.artemis.api.core.ActiveMQNotConnectedException;
+import org.apache.activemq.artemis.api.core.QueueConfiguration;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.api.core.client.ClientConsumer;
+import org.apache.activemq.artemis.api.core.client.ClientMessage;
+import org.apache.activemq.artemis.api.core.client.ClientProducer;
+import org.apache.activemq.artemis.api.core.client.ClientSession;
+import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
+import org.apache.activemq.artemis.api.core.client.MessageHandler;
+import org.apache.activemq.artemis.api.core.client.ServerLocator;
+import org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryImpl;
+import org.apache.activemq.artemis.core.client.impl.ClientSessionInternal;
+import org.apache.activemq.artemis.core.remoting.impl.invm.InVMRegistry;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.ActiveMQServers;
+import org.apache.activemq.artemis.jms.client.ActiveMQTextMessage;
+import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
+import org.apache.activemq.artemis.utils.Wait;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.lang.invoke.MethodHandles;
+
+public final class RandomReattachIntegrationTest extends ActiveMQTestBase {
+
+   private static final Logger logger = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+
+   private static final int RECEIVE_TIMEOUT = 10000;
+
+
+   private static final SimpleString ADDRESS = 
SimpleString.of("FailoverTestAddress");
+
+   private ActiveMQServer server;
+
+   private Timer timer;
+
+   private int getNumIterations() {
       return 2;
    }
+
+
+   @Test
+   public void testA() throws Exception {
+      runTest(new RunnableT() {
+         @Override
+         public void run(final ClientSessionFactory sf) throws Exception {
+            doTestA(sf);
+         }
+      });
+   }
+
+   @Test
+   public void testB() throws Exception {
+      runTest(new RunnableT() {
+         @Override
+         public void run(final ClientSessionFactory sf) throws Exception {
+            doTestB(sf);
+         }
+      });
+   }
+
+   @Test
+   public void testC() throws Exception {
+      runTest(new RunnableT() {
+         @Override
+         public void run(final ClientSessionFactory sf) throws Exception {
+            doTestC(sf);
+         }
+      });
+   }
+
+   @Test
+   public void testD() throws Exception {
+      runTest(new RunnableT() {
+         @Override
+         public void run(final ClientSessionFactory sf) throws Exception {
+            doTestD(sf);
+         }
+      });
+   }
+
+   @Test
+   public void testE() throws Exception {
+      runTest(new RunnableT() {
+         @Override
+         public void run(final ClientSessionFactory sf) throws Exception {
+            doTestE(sf);
+         }
+      });
+   }
+
+   @Test
+   public void testF() throws Exception {
+      runTest(new RunnableT() {
+         @Override
+         public void run(final ClientSessionFactory sf) throws Exception {
+            doTestF(sf);
+         }
+      });
+   }
+
+   @Test
+   public void testG() throws Exception {
+      runTest(new RunnableT() {
+         @Override
+         public void run(final ClientSessionFactory sf) throws Exception {
+            doTestG(sf);
+         }
+      });
+   }
+
+   @Test
+   public void testH() throws Exception {
+      runTest(new RunnableT() {
+         @Override
+         public void run(final ClientSessionFactory sf) throws Exception {
+            doTestH(sf);
+         }
+      });
+   }
+
+   @Test
+   public void testI() throws Exception {
+      runTest(new RunnableT() {
+         @Override
+         public void run(final ClientSessionFactory sf) throws Exception {
+            doTestI(sf);
+         }
+      });
+   }
+
+   @Test
+   public void testJ() throws Exception {
+      runTest(new RunnableT() {
+         @Override
+         public void run(final ClientSessionFactory sf) throws Exception {
+            doTestJ(sf);
+         }
+      });
+   }
+
+   @Test
+   public void testK() throws Exception {
+      runTest(new RunnableT() {
+         @Override
+         public void run(final ClientSessionFactory sf) throws Exception {
+            doTestK(sf);
+         }
+      });
+   }
+
+   @Test
+   public void testL() throws Exception {
+      runTest(new RunnableT() {
+         @Override
+         public void run(final ClientSessionFactory sf) throws Exception {
+            doTestL(sf);
+         }
+      });
+   }
+
+   @Test
+   public void testN() throws Exception {
+      runTest(new RunnableT() {
+         @Override
+         public void run(final ClientSessionFactory sf) throws Exception {
+            doTestN(sf);
+         }
+      });
+   }
+
+   public void runTest(final RunnableT runnable) throws Exception {
+      int numIts = getNumIterations();
+
+      for (int its = 0; its < numIts; its++) {
+         logger.debug("####{} iteration #{}", getName(), its);
+         start();
+         ServerLocator locator = 
createInVMNonHALocator().setReconnectAttempts(15).setConfirmationWindowSize(1024
 * 1024);
+
+         ClientSessionFactory sf = createSessionFactory(locator);
+
+         ClientSession session = sf.createSession(false, false, false);
+
+         Failer failer = startFailer(250, session);
+
+         try {
+            do {
+               runnable.run(sf);
+            }
+            while (!failer.isExecuted());
+         } catch (Exception e) {
+            if (expectedException(e)) {
+               logger.info("an expected exception was thrown - {}", 
e.getMessage(), e);
+               // one more time
+               numIts++;
+            } else {
+               throw e;
+            }
+         } finally {
+            try {
+               sf.close();
+               session.close();
+            } catch (Throwable ignored) {
+            }
+            try {
+               locator.close();
+            } catch (Throwable ignored) {
+            }
+
+            try {
+               server.stop();
+            } catch (Throwable ignored) {
+            } finally {
+               server = null;
+            }
+         }
+      }
+   }
+
+   private boolean expectedException(Throwable e) {
+      // during a reattachment it's not possible to know if the previous ack 
was sent or not.
+      // this is expected and clients should deal with this situation
+      return e instanceof ActiveMQIllegalStateException && 
e.getMessage().contains("AMQ229027");
+   }
+
+
+
+   protected void doTestA(final ClientSessionFactory sf) throws Exception {
+      long start = System.currentTimeMillis();
+
+      ClientSession s = sf.createSession(false, false, false);
+
+      final int numMessages = 100;
+
+      final int numSessions = 10;
+
+      Set<ClientConsumer> consumers = new HashSet<>();
+      Set<ClientSession> sessions = new HashSet<>();
+
+      for (int i = 0; i < numSessions; i++) {
+         SimpleString subName = SimpleString.of("sub" + i);
+
+         ClientSession sessConsume = sf.createSession(false, true, true);
+
+         sessConsume.start();
+
+         
sessConsume.createQueue(QueueConfiguration.of(subName).setAddress(ADDRESS).setDurable(false));
+
+         ClientConsumer consumer = sessConsume.createConsumer(subName);
+
+         consumers.add(consumer);
+
+         sessions.add(sessConsume);
+      }
+
+      ClientSession sessSend = sf.createSession(false, true, true);
+
+      ClientProducer producer = sessSend.createProducer(ADDRESS);
+
+      for (int i = 0; i < numMessages; i++) {
+         ClientMessage message = 
sessSend.createMessage(ActiveMQTextMessage.TYPE, false, 0, 
System.currentTimeMillis(), (byte) 1);
+         message.putIntProperty(SimpleString.of("count"), i);
+         producer.send(message);
+      }
+
+      class MyHandler extends AssertionCheckMessageHandler {
+
+         final CountDownLatch latch = new CountDownLatch(1);
+
+         int count;
+
+         @Override
+         public void onMessageAssert(final ClientMessage message) {
+            if (count == numMessages) {
+               fail("Too many messages");
+            }
+
+            assertEquals(count, 
message.getObjectProperty(SimpleString.of("count")));
+
+            count++;
+
+            try {
+               message.acknowledge();
+            } catch (ActiveMQException me) {
+               logger.error("Failed to process", me);
+            }
+
+            if (count == numMessages) {
+               latch.countDown();
+            }
+         }
+      }
+
+      Set<MyHandler> handlers = new HashSet<>();
+
+      for (ClientConsumer consumer : consumers) {
+         MyHandler handler = new MyHandler();
+
+         consumer.setMessageHandler(handler);
+
+         handlers.add(handler);
+      }
+
+      for (MyHandler handler : handlers) {
+         boolean ok = handler.latch.await(5000, TimeUnit.MILLISECONDS);
+
+         handler.checkAssertions();
+
+         assertTrue(ok, "Didn't receive all messages");
+      }
+
+      sessSend.close();
+      for (ClientSession session : sessions) {
+         session.close();
+      }
+
+      for (int i = 0; i < numSessions; i++) {
+         SimpleString subName = SimpleString.of("sub" + i);
+
+         s.deleteQueue(subName);
+      }
+
+      s.close();
+
+      long end = System.currentTimeMillis();
+
+      logger.debug("duration {}", (end - start));
+   }
+
+   protected void doTestB(final ClientSessionFactory sf) throws Exception {
+      long start = System.currentTimeMillis();
+
+      ClientSession s = sf.createSession(false, false, false);
+
+      final int numMessages = 100;
+
+      final int numSessions = 50;
+
+      Set<ClientConsumer> consumers = new HashSet<>();
+      Set<ClientSession> sessions = new HashSet<>();
+
+      for (int i = 0; i < numSessions; i++) {
+         SimpleString subName = SimpleString.of("sub" + i);
+
+         ClientSession sessConsume = sf.createSession(false, true, true);
+
+         
sessConsume.createQueue(QueueConfiguration.of(subName).setAddress(ADDRESS).setDurable(false));
+
+         ClientConsumer consumer = sessConsume.createConsumer(subName);
+
+         consumers.add(consumer);
+
+         sessions.add(sessConsume);
+      }
+
+      ClientSession sessSend = sf.createSession(false, true, true);
+
+      ClientProducer producer = sessSend.createProducer(ADDRESS);
+
+      for (int i = 0; i < numMessages; i++) {
+         ClientMessage message = 
sessSend.createMessage(ActiveMQTextMessage.TYPE, false, 0, 
System.currentTimeMillis(), (byte) 1);
+         message.putIntProperty(SimpleString.of("count"), i);
+         producer.send(message);
+      }
+
+      for (ClientSession session : sessions) {
+         session.start();
+      }
+
+      class MyHandler extends AssertionCheckMessageHandler {
+
+         final CountDownLatch latch = new CountDownLatch(1);
+
+         int count;
+
+         @Override
+         public void onMessageAssert(final ClientMessage message) {
+            if (count == numMessages) {
+               fail("Too many messages");
+            }
+
+            assertEquals(count, 
message.getObjectProperty(SimpleString.of("count")));
+
+            count++;
+
+            if (count == numMessages) {
+               latch.countDown();
+            }
+         }
+      }
+
+      Set<MyHandler> handlers = new HashSet<>();
+
+      for (ClientConsumer consumer : consumers) {
+         MyHandler handler = new MyHandler();
+
+         consumer.setMessageHandler(handler);
+
+         handlers.add(handler);
+      }
+
+      for (MyHandler handler : handlers) {
+         boolean ok = handler.latch.await(10000, TimeUnit.MILLISECONDS);
+
+         handler.checkAssertions();
+
+         assertTrue(ok);
+      }
+
+      sessSend.close();
+
+      for (ClientSession session : sessions) {
+         session.close();
+      }
+
+      for (int i = 0; i < numSessions; i++) {
+         SimpleString subName = SimpleString.of("sub" + i);
+
+         s.deleteQueue(subName);
+      }
+
+      s.close();
+
+      long end = System.currentTimeMillis();
+
+      logger.debug("duration {}", (end - start));
+
+   }
+
+   protected void doTestC(final ClientSessionFactory sf) throws Exception {
+      long start = System.currentTimeMillis();
+
+      ClientSession s = sf.createSession(false, false, false);
+
+      final int numMessages = 100;
+
+      final int numSessions = 1;
+
+      Set<ClientConsumer> consumers = new HashSet<>();
+      Set<ClientSession> sessions = new HashSet<>();
+
+      for (int i = 0; i < numSessions; i++) {
+         SimpleString subName = SimpleString.of("sub" + i);
+
+         ClientSession sessConsume = sf.createSession(false, false, false);
+
+         sessConsume.start();
+
+         
sessConsume.createQueue(QueueConfiguration.of(subName).setAddress(ADDRESS).setDurable(false));
+
+         ClientConsumer consumer = sessConsume.createConsumer(subName);
+
+         consumers.add(consumer);
+
+         sessions.add(sessConsume);
+      }
+
+      ClientSession sessSend = sf.createSession(false, false, true);
+
+      ClientProducer producer = sessSend.createProducer(ADDRESS);
+
+      for (int i = 0; i < numMessages; i++) {
+         ClientMessage message = 
sessSend.createMessage(ActiveMQTextMessage.TYPE, false, 0, 
System.currentTimeMillis(), (byte) 1);
+         message.putIntProperty(SimpleString.of("count"), i);
+         producer.send(message);
+      }
+
+      sessSend.rollback();
+
+      for (int i = 0; i < numMessages; i++) {
+         ClientMessage message = 
sessSend.createMessage(ActiveMQTextMessage.TYPE, false, 0, 
System.currentTimeMillis(), (byte) 1);
+         message.putIntProperty(SimpleString.of("count"), i);
+         producer.send(message);
+      }
+
+      sessSend.commit();
+
+      class MyHandler extends AssertionCheckMessageHandler {
+
+         final CountDownLatch latch = new CountDownLatch(1);
+
+         int count;
+
+         @Override
+         public void onMessageAssert(final ClientMessage message) {
+            if (count == numMessages) {
+               fail("Too many messages, expected " + count);
+            }
+
+            assertEquals(count, 
message.getObjectProperty(SimpleString.of("count")));
+
+            count++;
+
+            try {
+               message.acknowledge();
+            } catch (ActiveMQException e) {
+               e.printStackTrace();
+               throw new RuntimeException(e.getMessage(), e);
+            }
+
+            if (count == numMessages) {
+               latch.countDown();
+            }
+         }
+      }
+
+      Set<MyHandler> handlers = new HashSet<>();
+
+      for (ClientConsumer consumer : consumers) {
+         MyHandler handler = new MyHandler();
+
+         consumer.setMessageHandler(handler);
+
+         handlers.add(handler);
+      }
+
+      for (MyHandler handler : handlers) {
+         boolean ok = handler.latch.await(10000, TimeUnit.MILLISECONDS);
+
+         assertTrue(ok);
+
+         handler.checkAssertions();
+      }
+
+      handlers.clear();
+
+      // New handlers
+      for (ClientConsumer consumer : consumers) {
+         MyHandler handler = new MyHandler();
+
+         consumer.setMessageHandler(handler);
+
+         handlers.add(handler);
+      }
+
+      for (ClientSession session : sessions) {
+         session.rollback();
+      }
+
+      for (MyHandler handler : handlers) {
+         boolean ok = handler.latch.await(10000, TimeUnit.MILLISECONDS);
+
+         assertTrue(ok);
+
+         handler.checkAssertions();
+      }
+
+      for (ClientSession session : sessions) {
+         session.commit();
+      }
+
+      sessSend.close();
+      for (ClientSession session : sessions) {
+         session.close();
+      }
+
+      for (int i = 0; i < numSessions; i++) {
+         SimpleString subName = SimpleString.of("sub" + i);
+
+         s.deleteQueue(subName);
+      }
+
+      s.close();
+
+      long end = System.currentTimeMillis();
+
+      logger.debug("duration {}", (end - start));
+   }
+
+   protected void doTestD(final ClientSessionFactory sf) throws Exception {
+      long start = System.currentTimeMillis();
+
+      ClientSession s = sf.createSession(false, false, false);
+
+      final int numMessages = 100;
+
+      final int numSessions = 10;
+
+      Set<ClientConsumer> consumers = new HashSet<>();
+      Set<ClientSession> sessions = new HashSet<>();
+
+      for (int i = 0; i < numSessions; i++) {
+         SimpleString subName = SimpleString.of("sub" + i);
+
+         ClientSession sessConsume = sf.createSession(false, false, false);
+
+         
sessConsume.createQueue(QueueConfiguration.of(subName).setAddress(ADDRESS).setDurable(false));
+
+         ClientConsumer consumer = sessConsume.createConsumer(subName);
+
+         consumers.add(consumer);
+
+         sessions.add(sessConsume);
+      }
+
+      ClientSession sessSend = sf.createSession(false, false, true);
+
+      ClientProducer producer = sessSend.createProducer(ADDRESS);
+
+      for (int i = 0; i < numMessages; i++) {
+         ClientMessage message = 
sessSend.createMessage(ActiveMQTextMessage.TYPE, false, 0, 
System.currentTimeMillis(), (byte) 1);
+         message.putIntProperty(SimpleString.of("count"), i);
+         producer.send(message);
+      }
+
+      sessSend.rollback();
+
+      for (int i = 0; i < numMessages; i++) {
+         ClientMessage message = 
sessSend.createMessage(ActiveMQTextMessage.TYPE, false, 0, 
System.currentTimeMillis(), (byte) 1);
+         message.putIntProperty(SimpleString.of("count"), i);
+         producer.send(message);
+      }
+
+      sessSend.commit();
+
+      for (ClientSession session : sessions) {
+         session.start();
+      }
+
+      class MyHandler extends AssertionCheckMessageHandler {
+
+         final CountDownLatch latch = new CountDownLatch(1);
+
+         int count;
+
+         @Override
+         public void onMessageAssert(final ClientMessage message) {
+            if (count == numMessages) {
+               fail("Too many messages, " + count);
+            }
+
+            assertEquals(count, 
message.getObjectProperty(SimpleString.of("count")));
+
+            count++;
+
+            if (count == numMessages) {
+               latch.countDown();
+            }
+         }
+      }
+
+      Set<MyHandler> handlers = new HashSet<>();
+
+      for (ClientConsumer consumer : consumers) {
+         MyHandler handler = new MyHandler();
+
+         consumer.setMessageHandler(handler);
+
+         handlers.add(handler);
+      }
+
+      for (MyHandler handler : handlers) {
+         boolean ok = handler.latch.await(20000, TimeUnit.MILLISECONDS);
+
+         assertTrue(ok);
+
+         handler.checkAssertions();
+      }
+
+      handlers.clear();
+
+      // New handlers
+      for (ClientConsumer consumer : consumers) {
+         MyHandler handler = new MyHandler();
+
+         consumer.setMessageHandler(handler);
+
+         handlers.add(handler);
+      }
+
+      for (ClientSession session : sessions) {
+         session.rollback();
+      }
+
+      for (MyHandler handler : handlers) {
+         boolean ok = handler.latch.await(10000, TimeUnit.MILLISECONDS);
+
+         assertTrue(ok);
+
+         handler.checkAssertions();
+      }
+
+      for (ClientSession session : sessions) {
+         session.commit();
+      }
+
+      sessSend.close();
+      for (ClientSession session : sessions) {
+         session.close();
+      }
+
+      for (int i = 0; i < numSessions; i++) {
+         SimpleString subName = SimpleString.of("sub" + i);
+
+         s.deleteQueue(subName);
+      }
+
+      s.close();
+
+      long end = System.currentTimeMillis();
+
+      logger.debug("duration {}", (end - start));
+   }
+
+   // Now with synchronous receive()
+
+   protected void doTestE(final ClientSessionFactory sf) throws Exception {
+      long start = System.currentTimeMillis();
+
+      ClientSession s = sf.createSession(false, false, false);
+
+      final int numMessages = 100;
+
+      final int numSessions = 10;
+
+      Set<ClientConsumer> consumers = new HashSet<>();
+      Set<ClientSession> sessions = new HashSet<>();
+
+      for (int i = 0; i < numSessions; i++) {
+         SimpleString subName = SimpleString.of("sub" + i);
+
+         ClientSession sessConsume = sf.createSession(false, true, true);
+
+         sessConsume.start();
+
+         
sessConsume.createQueue(QueueConfiguration.of(subName).setAddress(ADDRESS).setDurable(false));
+
+         ClientConsumer consumer = sessConsume.createConsumer(subName);
+
+         consumers.add(consumer);
+
+         sessions.add(sessConsume);
+      }
+
+      ClientSession sessSend = sf.createSession(false, true, true);
+
+      ClientProducer producer = sessSend.createProducer(ADDRESS);
+
+      for (int i = 0; i < numMessages; i++) {
+         ClientMessage message = 
sessSend.createMessage(ActiveMQTextMessage.TYPE, false, 0, 
System.currentTimeMillis(), (byte) 1);
+         message.putIntProperty(SimpleString.of("count"), i);
+         producer.send(message);
+      }
+
+      for (int i = 0; i < numMessages; i++) {
+         for (ClientConsumer consumer : consumers) {
+            ClientMessage msg = consumer.receive(RECEIVE_TIMEOUT);
+
+            assertNotNull(msg);
+
+            assertEquals(i, msg.getObjectProperty(SimpleString.of("count")));
+
+            msg.acknowledge();
+         }
+      }
+
+      for (int i = 0; i < numMessages; i++) {
+         for (ClientConsumer consumer : consumers) {
+            ClientMessage msg = consumer.receiveImmediate();
+
+            assertNull(msg);
+         }
+      }
+
+      sessSend.close();
+      for (ClientSession session : sessions) {
+         session.close();
+      }
+
+      for (int i = 0; i < numSessions; i++) {
+         SimpleString subName = SimpleString.of("sub" + i);
+
+         s.deleteQueue(subName);
+      }
+
+      s.close();
+
+      long end = System.currentTimeMillis();
+
+      logger.debug("duration {}", (end - start));
+   }
+
+   protected void doTestF(final ClientSessionFactory sf) throws Exception {
+      long start = System.currentTimeMillis();
+
+      ClientSession s = sf.createSession(false, false, false);
+
+      final int numMessages = 100;
+
+      final int numSessions = 10;
+
+      Set<ClientConsumer> consumers = new HashSet<>();
+      Set<ClientSession> sessions = new HashSet<>();
+
+      for (int i = 0; i < numSessions; i++) {
+         SimpleString subName = SimpleString.of("sub" + i);
+
+         ClientSession sessConsume = sf.createSession(false, true, true);
+
+         
sessConsume.createQueue(QueueConfiguration.of(subName).setAddress(ADDRESS).setDurable(false));
+
+         ClientConsumer consumer = sessConsume.createConsumer(subName);
+
+         consumers.add(consumer);
+
+         sessions.add(sessConsume);
+      }
+
+      ClientSession sessSend = sf.createSession(false, true, true);
+
+      ClientProducer producer = sessSend.createProducer(ADDRESS);
+
+      for (int i = 0; i < numMessages; i++) {
+         ClientMessage message = 
sessSend.createMessage(ActiveMQTextMessage.TYPE, false, 0, 
System.currentTimeMillis(), (byte) 1);
+         message.putIntProperty(SimpleString.of("count"), i);
+         producer.send(message);
+      }
+
+      for (ClientSession session : sessions) {
+         session.start();
+      }
+
+      for (int i = 0; i < numMessages; i++) {
+         for (ClientConsumer consumer : consumers) {
+            ClientMessage msg = consumer.receive(RECEIVE_TIMEOUT);
+
+            if (msg == null) {
+               throw new IllegalStateException("Failed to receive message " + 
i);
+            }
+
+            assertNotNull(msg);
+
+            assertEquals(i, msg.getObjectProperty(SimpleString.of("count")));
+
+            msg.acknowledge();
+         }
+      }
+
+      for (int i = 0; i < numMessages; i++) {
+         for (ClientConsumer consumer : consumers) {
+            ClientMessage msg = consumer.receiveImmediate();
+
+            assertNull(msg);
+         }
+      }
+
+      sessSend.close();
+      for (ClientSession session : sessions) {
+         session.close();
+      }
+
+      for (int i = 0; i < numSessions; i++) {
+         SimpleString subName = SimpleString.of("sub" + i);
+
+         s.deleteQueue(subName);
+      }
+
+      s.close();
+
+      assertEquals(1, ((ClientSessionFactoryImpl) sf).numSessions());
+
+      long end = System.currentTimeMillis();
+
+      logger.debug("duration {}", (end - start));
+   }
+
+   protected void doTestG(final ClientSessionFactory sf) throws Exception {
+      long start = System.currentTimeMillis();
+
+      ClientSession s = sf.createSession(false, false, false);
+
+      final int numMessages = 100;
+
+      final int numSessions = 10;
+
+      Set<ClientConsumer> consumers = new HashSet<>();
+      Set<ClientSession> sessions = new HashSet<>();
+
+      for (int i = 0; i < numSessions; i++) {
+         SimpleString subName = SimpleString.of("sub" + i);
+
+         ClientSession sessConsume = sf.createSession(false, false, false);
+
+         sessConsume.start();
+
+         
sessConsume.createQueue(QueueConfiguration.of(subName).setAddress(ADDRESS).setDurable(false));
+
+         ClientConsumer consumer = sessConsume.createConsumer(subName);
+
+         consumers.add(consumer);
+
+         sessions.add(sessConsume);
+      }
+
+      ClientSession sessSend = sf.createSession(false, false, false);
+
+      ClientProducer producer = sessSend.createProducer(ADDRESS);
+
+      for (int i = 0; i < numMessages; i++) {
+         ClientMessage message = 
sessSend.createMessage(ActiveMQTextMessage.TYPE, false, 0, 
System.currentTimeMillis(), (byte) 1);
+         message.putIntProperty(SimpleString.of("count"), i);
+         producer.send(message);
+      }
+
+      sessSend.rollback();
+
+      for (int i = 0; i < numMessages; i++) {
+         ClientMessage message = 
sessSend.createMessage(ActiveMQTextMessage.TYPE, false, 0, 
System.currentTimeMillis(), (byte) 1);
+         message.putIntProperty(SimpleString.of("count"), i);
+         producer.send(message);
+      }
+
+      sessSend.commit();
+
+      for (int i = 0; i < numMessages; i++) {
+         for (ClientConsumer consumer : consumers) {
+            ClientMessage msg = consumer.receive(RECEIVE_TIMEOUT);
+
+            assertNotNull(msg);
+
+            assertEquals(i, msg.getObjectProperty(SimpleString.of("count")));
+
+            msg.acknowledge();
+         }
+      }
+
+      for (ClientConsumer consumer : consumers) {
+         ClientMessage msg = consumer.receiveImmediate();
+
+         assertNull(msg);
+      }
+
+      for (ClientSession session : sessions) {
+         session.rollback();
+      }
+
+      for (int i = 0; i < numMessages; i++) {
+         for (ClientConsumer consumer : consumers) {
+            ClientMessage msg = consumer.receive(RECEIVE_TIMEOUT);
+
+            assertNotNull(msg);
+
+            assertEquals(i, msg.getObjectProperty(SimpleString.of("count")));
+
+            msg.acknowledge();
+         }
+      }
+
+      for (int i = 0; i < numMessages; i++) {
+         for (ClientConsumer consumer : consumers) {
+            ClientMessage msg = consumer.receiveImmediate();
+
+            assertNull(msg);
+         }
+      }
+
+      for (ClientSession session : sessions) {
+         session.commit();
+      }
+
+      sessSend.close();
+      for (ClientSession session : sessions) {
+         session.close();
+      }
+
+      for (int i = 0; i < numSessions; i++) {
+         SimpleString subName = SimpleString.of("sub" + i);
+
+         s.deleteQueue(subName);
+      }
+
+      s.close();
+
+      long end = System.currentTimeMillis();
+
+      logger.debug("duration {}", (end - start));
+   }
+
+   protected void doTestH(final ClientSessionFactory sf) throws Exception {
+      long start = System.currentTimeMillis();
+
+      ClientSession s = sf.createSession(false, false, false);
+
+      final int numMessages = 100;
+
+      final int numSessions = 10;
+
+      Set<ClientConsumer> consumers = new HashSet<>();
+      Set<ClientSession> sessions = new HashSet<>();
+
+      for (int i = 0; i < numSessions; i++) {
+         SimpleString subName = SimpleString.of("sub" + i);
+
+         ClientSession sessConsume = sf.createSession(false, false, false);
+
+         
sessConsume.createQueue(QueueConfiguration.of(subName).setAddress(ADDRESS).setDurable(false));
+
+         ClientConsumer consumer = sessConsume.createConsumer(subName);
+
+         consumers.add(consumer);
+
+         sessions.add(sessConsume);
+      }
+
+      ClientSession sessSend = sf.createSession(false, false, false);
+
+      ClientProducer producer = sessSend.createProducer(ADDRESS);
+
+      for (int i = 0; i < numMessages; i++) {
+         ClientMessage message = 
sessSend.createMessage(ActiveMQTextMessage.TYPE, false, 0, 
System.currentTimeMillis(), (byte) 1);
+         message.putIntProperty(SimpleString.of("count"), i);
+         producer.send(message);
+      }
+
+      sessSend.rollback();
+
+      for (int i = 0; i < numMessages; i++) {
+         ClientMessage message = 
sessSend.createMessage(ActiveMQTextMessage.TYPE, false, 0, 
System.currentTimeMillis(), (byte) 1);
+         message.putIntProperty(SimpleString.of("count"), i);
+         producer.send(message);
+      }
+
+      sessSend.commit();
+
+      for (ClientSession session : sessions) {
+         session.start();
+      }
+
+      for (int i = 0; i < numMessages; i++) {
+         for (ClientConsumer consumer : consumers) {
+            ClientMessage msg = consumer.receive(RECEIVE_TIMEOUT);
+
+            assertNotNull(msg);
+
+            assertEquals(i, msg.getObjectProperty(SimpleString.of("count")));
+
+            msg.acknowledge();
+         }
+      }
+
+      for (int i = 0; i < numMessages; i++) {
+         for (ClientConsumer consumer : consumers) {
+            ClientMessage msg = consumer.receiveImmediate();
+
+            assertNull(msg);
+         }
+      }
+
+      for (ClientSession session : sessions) {
+         session.rollback();
+      }
+
+      for (int i = 0; i < numMessages; i++) {
+         for (ClientConsumer consumer : consumers) {
+            ClientMessage msg = consumer.receive(RECEIVE_TIMEOUT);
+
+            assertNotNull(msg);
+
+            assertEquals(i, msg.getObjectProperty(SimpleString.of("count")));
+
+            msg.acknowledge();
+         }
+      }
+
+      for (int i = 0; i < numMessages; i++) {
+         for (ClientConsumer consumer : consumers) {
+            ClientMessage msg = consumer.receiveImmediate();
+
+            assertNull(msg);
+         }
+      }
+
+      for (ClientSession session : sessions) {
+         session.commit();
+      }
+
+      sessSend.close();
+      for (ClientSession session : sessions) {
+         session.close();
+      }
+
+      for (int i = 0; i < numSessions; i++) {
+         SimpleString subName = SimpleString.of("sub" + i);
+
+         s.deleteQueue(subName);
+      }
+
+      s.close();
+
+      long end = System.currentTimeMillis();
+
+      logger.debug("duration {}", (end - start));
+   }
+
+   protected void doTestI(final ClientSessionFactory sf) throws Exception {
+      ClientSession sessCreate = sf.createSession(false, true, true);
+
+      sessCreate.createQueue(QueueConfiguration.of(ADDRESS).setDurable(false));
+
+      ClientSession sess = sf.createSession(false, true, true);
+
+      sess.start();
+
+      ClientConsumer consumer = sess.createConsumer(ADDRESS);
+
+      ClientProducer producer = sess.createProducer(ADDRESS);
+
+      ClientMessage message = sess.createMessage(ActiveMQTextMessage.TYPE, 
false, 0, System.currentTimeMillis(), (byte) 1);
+      producer.send(message);
+
+      ClientMessage message2 = consumer.receive(RECEIVE_TIMEOUT);
+
+      assertNotNull(message2);
+
+      message2.acknowledge();
+
+      sess.close();
+
+      sessCreate.deleteQueue(ADDRESS);
+
+      sessCreate.close();
+   }
+
+   protected void doTestJ(final ClientSessionFactory sf) throws Exception {
+      ClientSession sessCreate = sf.createSession(false, true, true);
+
+      sessCreate.createQueue(QueueConfiguration.of(ADDRESS).setDurable(false));
+
+      ClientSession sess = sf.createSession(false, true, true);
+
+      sess.start();
+
+      ClientConsumer consumer = sess.createConsumer(ADDRESS);
+
+      ClientProducer producer = sess.createProducer(ADDRESS);
+
+      ClientMessage message = sess.createMessage(ActiveMQTextMessage.TYPE, 
false, 0, System.currentTimeMillis(), (byte) 1);
+      producer.send(message);
+
+      ClientMessage message2 = consumer.receive(RECEIVE_TIMEOUT);
+
+      assertNotNull(message2);
+
+      message2.acknowledge();
+
+      sess.close();
+
+      sessCreate.deleteQueue(ADDRESS);
+
+      sessCreate.close();
+   }
+
+   protected void doTestK(final ClientSessionFactory sf) throws Exception {
+      ClientSession s = sf.createSession(false, false, false);
+
+      s.createQueue(QueueConfiguration.of(ADDRESS).setDurable(false));
+
+      final int numConsumers = 100;
+
+      for (int i = 0; i < numConsumers; i++) {
+         ClientConsumer consumer = s.createConsumer(ADDRESS);
+
+         consumer.close();
+      }
+
+      s.deleteQueue(ADDRESS);
+
+      s.close();
+   }
+
+   protected void doTestL(final ClientSessionFactory sf) throws Exception {
+      final int numSessions = 10;
+
+      for (int i = 0; i < numSessions; i++) {
+         ClientSession session = sf.createSession(false, false, false);
+
+         session.close();
+      }
+   }
+
+   protected void doTestN(final ClientSessionFactory sf) throws Exception {
+      ClientSession sessCreate = sf.createSession(false, true, true);
+
+      sessCreate.createQueue(QueueConfiguration.of(ADDRESS).setDurable(false));
+
+      ClientSession sess = sf.createSession(false, true, true);
+
+      sess.stop();
+
+      sess.start();
+
+      sess.stop();
+
+      ClientConsumer consumer = sess.createConsumer(ADDRESS);
+
+      ClientProducer producer = sess.createProducer(ADDRESS);
+
+      ClientMessage message = sess.createMessage(ActiveMQTextMessage.TYPE, 
false, 0, System.currentTimeMillis(), (byte) 1);
+      producer.send(message);
+
+      sess.start();
+
+      ClientMessage message2 = consumer.receive(RECEIVE_TIMEOUT);
+
+      assertNotNull(message2);
+
+      message2.acknowledge();
+
+      sess.stop();
+
+      sess.start();
+
+      sess.close();
+
+      sessCreate.deleteQueue(SimpleString.of(ADDRESS.toString()));
+
+      sessCreate.close();
+   }
+
+   @Override
+   @BeforeEach
+   public void setUp() throws Exception {
+      super.setUp();
+
+      timer = new Timer(true);
+   }
+
+   @Override
+   @AfterEach
+   public void tearDown() throws Exception {
+      timer.cancel();
+
+      super.tearDown();
+   }
+
+
+   private Failer startFailer(final long time, final ClientSession session) {
+      Failer failer = new Failer((ClientSessionInternal) session);
+
+      timer.schedule(failer, (long) (time * Math.random()), 100);
+
+      return failer;
+   }
+
+   private void start() throws Exception {
+      server = 
addServer(ActiveMQServers.newActiveMQServer(createDefaultInVMConfig(), false));
+      server.start();
+   }
+
+   private void stop() throws Exception {
+      server.stop();
+      Wait.assertFalse(server::isStarted, 5_000, 100);
+
+      Wait.assertEquals(0, () -> InVMRegistry.instance.size(), 5_000, 100);
+
+      server = null;
+   }
+
+
+   class Failer extends TimerTask {
+
+      private final ClientSessionInternal session;
+
+      private boolean executed;
+
+      Failer(final ClientSessionInternal session) {
+         this.session = session;
+      }
+
+      @Override
+      public synchronized void run() {
+         logger.debug("** Failing connection");
+
+         session.getConnection().fail(new 
ActiveMQNotConnectedException("oops"));
+
+         logger.debug("** Fail complete");
+
+         cancel();
+
+         executed = true;
+      }
+
+      public synchronized boolean isExecuted() {
+         return executed;
+      }
+   }
+
+   public abstract class RunnableT {
+
+      abstract void run(ClientSessionFactory sf) throws Exception;
+   }
+
+   abstract static class AssertionCheckMessageHandler implements 
MessageHandler {
+
+      public void checkAssertions() {
+         for (AssertionError e : errors) {
+            // it will throw the first error
+            throw e;
+         }
+      }
+
+      private final ArrayList<AssertionError> errors = new ArrayList<>();
+
+      /* (non-Javadoc)
+       * @see MessageHandler#onMessage(ClientMessage)
+       */
+      @Override
+      public void onMessage(ClientMessage message) {
+         try {
+            onMessageAssert(message);
+         } catch (AssertionError e) {
+            e.printStackTrace(); // System.out -> junit reports
+            errors.add(e);
+         }
+      }
+
+      public abstract void onMessageAssert(ClientMessage message);
+   }
 }
diff --git 
a/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/failover/RandomFailoverSoakTest.java
 
b/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/failover/RandomFailoverSoakTest.java
deleted file mode 100644
index 4e3d9d6a3e..0000000000
--- 
a/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/failover/RandomFailoverSoakTest.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.artemis.tests.soak.failover;
-
-import 
org.apache.activemq.artemis.tests.integration.cluster.reattach.RandomReattachTestBase;
-
-import static org.apache.activemq.artemis.utils.TestParameters.testProperty;
-
-public class RandomFailoverSoakTest extends RandomReattachTestBase {
-
-   private static final String TEST_NAME = "RANDOM";
-   public static final int TEST_REPETITION = testProperty(TEST_NAME, 
"TEST_REPETITION", 10);
-
-   @Override
-   protected int getNumIterations() {
-      return TEST_REPETITION;
-   }
-
-}
diff --git 
a/tests/stress-tests/src/test/java/org/apache/activemq/artemis/tests/stress/failover/RandomReattachStressTest.java
 
b/tests/stress-tests/src/test/java/org/apache/activemq/artemis/tests/stress/failover/RandomReattachStressTest.java
deleted file mode 100644
index 2cea3f46b6..0000000000
--- 
a/tests/stress-tests/src/test/java/org/apache/activemq/artemis/tests/stress/failover/RandomReattachStressTest.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.artemis.tests.stress.failover;
-
-import 
org.apache.activemq.artemis.tests.integration.cluster.reattach.RandomReattachTestBase;
-
-public class RandomReattachStressTest extends RandomReattachTestBase {
-
-
-
-
-
-   @Override
-   protected int getNumIterations() {
-      return 100;
-   }
-
-
-
-}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact


Reply via email to