This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git

commit b041f2cde2fe7d8c9d74708ceef53501b603e88b
Author: Clebert Suconic <[email protected]>
AuthorDate: Wed Nov 8 09:16:06 2023 -0500

    ARTEMIS-4476 Client Failures Soak Test
    
    I was not able to reproduce the actual issue here, but I heavily used this 
test during debugging.
    This will not serve as a reproducer to the Ghost consumer issue, but this 
is a valid test.
---
 .../apache/activemq/artemis/util/ServerUtil.java   |  23 +-
 .../api/core/management/SimpleManagement.java      |  26 ++
 .../activemq/artemis/utils/RealServerTestBase.java |  16 +-
 .../soak/clientFailure/ClientFailureSoakTest.java  | 273 +++++++++++++++++++++
 .../clientFailure/ClientFailureSoakTestClient.java | 121 +++++++++
 tests/soak-tests/src/test/scripts/parameters.sh    |  31 +++
 6 files changed, 486 insertions(+), 4 deletions(-)

diff --git 
a/artemis-cli/src/main/java/org/apache/activemq/artemis/util/ServerUtil.java 
b/artemis-cli/src/main/java/org/apache/activemq/artemis/util/ServerUtil.java
index c5575368d0..f2a3b0da7f 100644
--- a/artemis-cli/src/main/java/org/apache/activemq/artemis/util/ServerUtil.java
+++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/util/ServerUtil.java
@@ -48,7 +48,11 @@ public class ServerUtil {
     * @throws Exception
     */
    public static Process startServer(String artemisInstance, String 
serverName, int id, int timeout) throws Exception {
-      final Process process = internalStartServer(artemisInstance, serverName);
+      return startServer(artemisInstance, serverName, id, timeout, null);
+   }
+
+   public static Process startServer(String artemisInstance, String 
serverName, int id, int timeout, File brokerProperties) throws Exception {
+      final Process process = internalStartServer(artemisInstance, serverName, 
brokerProperties);
 
       // wait for start
       if (timeout != 0) {
@@ -59,7 +63,11 @@ public class ServerUtil {
    }
 
    public static Process startServer(String artemisInstance, String 
serverName, String uri, int timeout) throws Exception {
-      final Process process = internalStartServer(artemisInstance, serverName);
+      return  startServer(artemisInstance, serverName, uri, timeout, null);
+   }
+
+   public static Process startServer(String artemisInstance, String 
serverName, String uri, int timeout, File propertiesFile) throws Exception {
+      final Process process = internalStartServer(artemisInstance, serverName, 
propertiesFile);
 
       // wait for start
       if (timeout != 0) {
@@ -71,8 +79,17 @@ public class ServerUtil {
 
    private static Process internalStartServer(String artemisInstance,
                                               String serverName) throws 
IOException, ClassNotFoundException {
+      return internalStartServer(artemisInstance, serverName, null);
+   }
+   private static Process internalStartServer(String artemisInstance,
+                                              String serverName,
+                                              File propertiesFile) throws 
IOException, ClassNotFoundException {
 
-      return execute(artemisInstance, serverName, "run");
+      if (propertiesFile != null) {
+         return execute(artemisInstance, serverName, "run", "--properties", 
propertiesFile.getAbsolutePath());
+      } else {
+         return execute(artemisInstance, serverName, "run");
+      }
    }
 
    public static Process execute(String artemisInstance, String jobName, 
String...args) throws IOException, ClassNotFoundException {
diff --git 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/SimpleManagement.java
 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/SimpleManagement.java
index c63d9fa2db..a51267e256 100644
--- 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/SimpleManagement.java
+++ 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/SimpleManagement.java
@@ -20,6 +20,7 @@ package org.apache.activemq.artemis.api.core.management;
 import java.lang.invoke.MethodHandles;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 
@@ -94,10 +95,29 @@ public class SimpleManagement implements AutoCloseable {
       return responseLong.get();
    }
 
+   public int simpleManagementInt(String resource, String method, Object... 
parameters) throws Exception {
+      AtomicInteger responseInt = new AtomicInteger();
+      doManagement((m) -> setupCall(m, resource, method, parameters), m -> 
setIntResult(m, responseInt), SimpleManagement::failed);
+      return responseInt.get();
+   }
+
    public long getMessageCountOnQueue(String queueName) throws Exception {
       return simpleManagementLong(ResourceNames.QUEUE + queueName, 
"getMessageCount");
    }
 
+   public int getDeliveringCountOnQueue(String queueName) throws Exception {
+      return simpleManagementInt(ResourceNames.QUEUE + queueName, 
"getDeliveringCount");
+   }
+
+   public int getNumberOfConsumersOnQueue(String queueName) throws Exception {
+      String responseString = simpleManagement(ResourceNames.QUEUE + 
queueName, "listConsumersAsJSON");
+
+      JsonArray consumersAsJSON = JsonUtil.readJsonArray(responseString);
+
+      return consumersAsJSON.size();
+   }
+
+
    public long getMessagesAddedOnQueue(String queueName) throws Exception {
       return simpleManagementLong(ResourceNames.QUEUE + queueName, 
"getMessagesAdded");
    }
@@ -154,6 +174,12 @@ public class SimpleManagement implements AutoCloseable {
       result.set(resultLong);
    }
 
+   protected static void setIntResult(ClientMessage m, AtomicInteger result) 
throws Exception {
+      int resultInt = (int)ManagementHelper.getResult(m, Integer.class);
+      logger.debug("management result:: {}", resultInt);
+      result.set(resultInt);
+   }
+
    protected void doManagement(ManagementHelper.MessageAcceptor setup, 
ManagementHelper.MessageAcceptor ok, ManagementHelper.MessageAcceptor failed) 
throws Exception {
       if (session != null) {
          ManagementHelper.doManagement(session, setup, ok, failed);
diff --git 
a/tests/artemis-test-support/src/main/java/org/apache/activemq/artemis/utils/RealServerTestBase.java
 
b/tests/artemis-test-support/src/main/java/org/apache/activemq/artemis/utils/RealServerTestBase.java
index 7980fd577c..aef00d8766 100644
--- 
a/tests/artemis-test-support/src/main/java/org/apache/activemq/artemis/utils/RealServerTestBase.java
+++ 
b/tests/artemis-test-support/src/main/java/org/apache/activemq/artemis/utils/RealServerTestBase.java
@@ -22,13 +22,17 @@ import javax.management.ObjectName;
 import javax.management.remote.JMXConnector;
 import javax.management.remote.JMXConnectorFactory;
 import javax.management.remote.JMXServiceURL;
+import java.io.BufferedOutputStream;
 import java.io.BufferedReader;
 import java.io.File;
+import java.io.FileOutputStream;
 import java.io.FileReader;
 import java.io.IOException;
+import java.io.OutputStream;
 import java.lang.invoke.MethodHandles;
 import java.net.MalformedURLException;
 import java.util.HashSet;
+import java.util.Properties;
 import java.util.Set;
 
 import org.apache.activemq.artemis.api.core.RoutingType;
@@ -112,7 +116,11 @@ public class RealServerTestBase extends ActiveMQTestBase {
    }
 
    public Process startServer(String serverName, int portID, int timeout) 
throws Exception {
-      Process process = ServerUtil.startServer(getServerLocation(serverName), 
serverName, portID, timeout);
+      return startServer(serverName, portID, timeout, null);
+   }
+
+   public Process startServer(String serverName, int portID, int timeout, File 
brokerProperties) throws Exception {
+      Process process = ServerUtil.startServer(getServerLocation(serverName), 
serverName, portID, timeout, brokerProperties);
       addProcess(process);
       return process;
    }
@@ -272,4 +280,10 @@ public class RealServerTestBase extends ActiveMQTestBase {
       return false;
    }
 
+   protected static void saveProperties(Properties properties, File 
propertiesFile) throws Exception {
+      OutputStream outputStream = new BufferedOutputStream(new 
FileOutputStream(propertiesFile));
+      properties.store(outputStream, "# Broker properties");
+      outputStream.close();
+   }
+
 }
diff --git 
a/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/clientFailure/ClientFailureSoakTest.java
 
b/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/clientFailure/ClientFailureSoakTest.java
new file mode 100644
index 0000000000..2be261258d
--- /dev/null
+++ 
b/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/clientFailure/ClientFailureSoakTest.java
@@ -0,0 +1,273 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.tests.soak.clientFailure;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import java.io.File;
+import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Properties;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.RedeliveryPolicy;
+import org.apache.activemq.artemis.api.core.management.SimpleManagement;
+import org.apache.activemq.artemis.tests.soak.SoakTestBase;
+import org.apache.activemq.artemis.tests.util.CFUtil;
+import org.apache.activemq.artemis.tests.util.Wait;
+import org.apache.activemq.artemis.utils.SpawnedVMSupport;
+import org.apache.activemq.artemis.utils.cli.helper.HelperCreate;
+import org.junit.Assert;
+import org.junit.Assume;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.activemq.artemis.utils.TestParameters.testProperty;
+
+@RunWith(Parameterized.class)
+public class ClientFailureSoakTest extends SoakTestBase {
+
+   private static final Logger logger = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+   Process serverProcess;
+
+   public static final String SERVER_NAME_0 = "clientFailure";
+
+   private static File brokerPropertiesFile;
+
+   @BeforeClass
+   public static void createServers() throws Exception {
+      File serverLocation = getFileServerLocation(SERVER_NAME_0);
+      deleteDirectory(serverLocation);
+      HelperCreate cliCreateServer = new HelperCreate();
+      
cliCreateServer.setAllowAnonymous(true).setArtemisInstance(serverLocation);
+      cliCreateServer.setArgs("--global-max-messages", "500000", 
"--java-options", "-ea", "--java-options", "-Xmx512M", "--queues", 
"CLIENT_TEST,OUT_QUEUE");
+      cliCreateServer.createServer();
+
+      // Creating a broker properties file instead of providing a new 
broker.xml just for these options
+      Properties brokerProperties = new Properties();
+      brokerProperties.put("addressesSettings.#.redeliveryDelay", "0");
+      brokerProperties.put("addressesSettings.#.maxDeliveryAttempts", "-1");
+      brokerPropertiesFile = new File(serverLocation, "broker.properties");
+      saveProperties(brokerProperties, brokerPropertiesFile);
+   }
+
+   private static final String QUEUE_NAME = "CLIENT_TEST";
+
+   private static final String TEST_NAME = "CLIENT_FAILURE";
+
+   private final String protocol;
+   private static final boolean TEST_ENABLED = 
Boolean.parseBoolean(testProperty(TEST_NAME, "TEST_ENABLED", "true"));
+   private static final String PROTOCOL_LIST = testProperty(TEST_NAME, 
"PROTOCOL_LIST", "OPENWIRE,AMQP,CORE");
+
+   private boolean USE_LARGE_MESSAGE;
+
+   private final int THREADS_PER_VM;
+   private final int CLIENT_CONSUMERS_PER_THREAD;
+   private final int TEST_REPEATS;
+   private final int TOTAL_ITERATIONS;
+   private final int NUMBER_OF_VMS;
+   private final int NUMBER_OF_MESSAGES;
+   private final String MEMORY_CLIENT;
+
+   @Parameterized.Parameters(name = "protocol={0}")
+   public static Collection<Object[]> parameters() {
+      String[] protocols = PROTOCOL_LIST.split(",");
+
+      ArrayList<Object[]> parameters = new ArrayList<>();
+      for (String str : protocols) {
+         logger.info("Adding {} to the list for the test", str);
+         parameters.add(new Object[]{str});
+      }
+
+      return parameters;
+   }
+
+   @Before
+   public void before() throws Exception {
+      Assume.assumeTrue(TEST_ENABLED);
+      cleanupData(SERVER_NAME_0);
+
+      serverProcess = startServer(SERVER_NAME_0, 0, 30_000, 
brokerPropertiesFile);
+   }
+
+   public ClientFailureSoakTest(String protocol) {
+      this.protocol = protocol;
+
+      THREADS_PER_VM = testProperty(TEST_NAME, protocol + "_THREADS_PER_VM", 
6);
+      USE_LARGE_MESSAGE = Boolean.valueOf(testProperty(TEST_NAME, protocol + 
"_USE_LARGE_MESSAGE", "false"));
+      CLIENT_CONSUMERS_PER_THREAD = testProperty(TEST_NAME, protocol + 
"_CLIENT_CONSUMERS_PER_THREAD", 10);
+      TEST_REPEATS = testProperty(TEST_NAME, protocol + "_TEST_REPEATS", 1);
+      TOTAL_ITERATIONS = testProperty(TEST_NAME, protocol + 
"_TOTAL_ITERATION", 2);
+      NUMBER_OF_VMS = testProperty(TEST_NAME, protocol + "_NUMBER_OF_VMS", 5);
+      NUMBER_OF_MESSAGES = testProperty(TEST_NAME, protocol + 
"_NUMBER_OF_MESSAGES", 1_000);
+      MEMORY_CLIENT = testProperty(TEST_NAME, protocol + "_MEMORY_CLIENT", 
"-Xmx128m");
+   }
+
+   @Test
+   public void testSoakClientFailures() throws Exception {
+      SimpleManagement simpleManagement = new 
SimpleManagement("tcp://localhost:61616", null, null);
+
+      ConnectionFactory factory = CFUtil.createConnectionFactory(protocol, 
"tcp://localhost:61616");
+      if (protocol.equals("OPENWIRE")) {
+         RedeliveryPolicy inifinitePolicy = new RedeliveryPolicy();
+         inifinitePolicy.setMaximumRedeliveries(-1);
+         
((ActiveMQConnectionFactory)factory).setRedeliveryPolicy(inifinitePolicy);
+      }
+      try (Connection connection = factory.createConnection()) {
+         Session session = connection.createSession(true, 
Session.SESSION_TRANSACTED);
+         Queue queue = session.createQueue(QUEUE_NAME);
+         MessageProducer producer = session.createProducer(queue);
+
+         String largeBody;
+
+         {
+            StringBuilder builder = new StringBuilder();
+            while (builder.length() < 150 * 1024) {
+               builder.append("This is a large string... LOREM IPSUM WHATEVER 
IT SAYS IN THAT THING... ");
+            }
+            largeBody = builder.toString();
+         }
+
+         for (int i = 0; i < NUMBER_OF_MESSAGES; i++) {
+            Message message;
+            if (i % 100 == 0 && USE_LARGE_MESSAGE) {
+               message = session.createTextMessage(largeBody);
+            } else {
+               message = session.createTextMessage("text " + i);
+            }
+            message.setIntProperty("i", i);
+            producer.send(message);
+            if (i > 0 && i % 1000 == 0) {
+               logger.info("Sent {} messages", i);
+               session.commit();
+            }
+         }
+         session.commit();
+      }
+
+      ExecutorService service = Executors.newFixedThreadPool(NUMBER_OF_VMS);
+      runAfter(service::shutdownNow);
+
+      AtomicInteger errors = new AtomicInteger(0);
+
+      for (int testRepeat = 0; testRepeat < TEST_REPEATS; testRepeat++) {
+         
logger.info("\n*******************************************************************************************************************************"
 + "\nTest repeat {}" + 
"\n*******************************************************************************************************************************",
 testRepeat);
+
+         CountDownLatch done = new CountDownLatch(NUMBER_OF_VMS);
+
+         for (int i = 0; i < NUMBER_OF_VMS; i++) {
+            int threadID = i;
+            service.execute(() -> {
+               try {
+                  for (int it = 0; it < TOTAL_ITERATIONS; it++) {
+                     
logger.info("\n*******************************************************************************************************************************"
 + "\nThread {} iteration {}" + 
"\n*******************************************************************************************************************************",
 threadID, it);
+                     Process process = SpawnedVMSupport.spawnVM(null, null, 
ClientFailureSoakTestClient.class.getName(), "-Xms128m", MEMORY_CLIENT, new 
String[]{}, true, true, protocol, String.valueOf(THREADS_PER_VM), 
String.valueOf(CLIENT_CONSUMERS_PER_THREAD), QUEUE_NAME);
+                     logger.info("Started process");
+                     Assert.assertTrue(process.waitFor(10, TimeUnit.HOURS));
+                     
Assert.assertEquals(ClientFailureSoakTestClient.RETURN_OK, process.exitValue());
+                  }
+               } catch (Throwable throwable) {
+                  logger.warn(throwable.getMessage(), throwable);
+                  errors.incrementAndGet();
+               } finally {
+                  done.countDown();
+               }
+            });
+         }
+
+         Assert.assertTrue(done.await(10, TimeUnit.HOURS));
+
+         if (errors.get() != 0) {
+            logger.warn("There were errors in previous executions:: {}. We 
will look into the receiving part now, but beware of previous errors", 
errors.get());
+         } else {
+            logger.info("No errors on any previous execution, checking 
consumer now");
+         }
+
+         int outOfOrder = 0;
+
+         try {
+            Wait.assertEquals(0, () -> 
simpleManagement.getDeliveringCountOnQueue(QUEUE_NAME), 60_000, 100);
+            Wait.assertEquals(0, () -> 
simpleManagement.getNumberOfConsumersOnQueue(QUEUE_NAME), 60_000, 100);
+            Wait.assertEquals((long) NUMBER_OF_MESSAGES, () -> 
simpleManagement.getMessageCountOnQueue(QUEUE_NAME), 60_000, 500);
+         } catch (Throwable e) {
+            logger.warn(e.getMessage(), e);
+            errors.incrementAndGet();
+         }
+
+         try (Connection connection = factory.createConnection()) {
+            connection.start();
+            Session session = connection.createSession(true, 
Session.SESSION_TRANSACTED);
+            Queue queue = session.createQueue(QUEUE_NAME);
+            MessageConsumer consumer = session.createConsumer(queue);
+
+            Wait.assertEquals(1, () -> 
simpleManagement.getNumberOfConsumersOnQueue(QUEUE_NAME), 60_000, 100);
+
+            HashSet<Integer> receivedIDs = new HashSet<>();
+
+            for (int i = 0; i < NUMBER_OF_MESSAGES; i++) {
+               Message message = consumer.receive(60_000);
+               Assert.assertNotNull(message);
+
+               if (!receivedIDs.add(message.getIntProperty("i"))) {
+                  logger.warn("Message {} received in duplicate", 
message.getIntProperty("i"));
+                  Assert.fail("Message " + message.getIntProperty("i") + " 
received in duplicate");
+               }
+
+               if (i != message.getIntProperty("i")) {
+                  Assert.fail("Message " + message.getIntProperty("i") + " 
received out of order, when it was supposed to be " + i + " with body size = " 
+ ((TextMessage) message).getText().length());
+                  logger.info("message {} received out of order. Expected {}", 
message.getIntProperty("i"), i);
+                  outOfOrder++;
+               }
+
+               if (i % 1000 == 0) {
+                  logger.info("Received {} messages with {} outOfOrder", i, 
outOfOrder);
+               }
+            }
+            logger.info("Received {} messages outOfOrder", outOfOrder);
+            Assert.assertNull(consumer.receiveNoWait());
+            session.rollback();
+            Assert.assertEquals(0, outOfOrder);
+         }
+
+         Wait.assertEquals(0, () -> 
simpleManagement.getDeliveringCountOnQueue(QUEUE_NAME), 10_000, 100);
+         Wait.assertEquals(0, () -> 
simpleManagement.getNumberOfConsumersOnQueue(QUEUE_NAME), 10_000, 100);
+         Wait.assertEquals((long) NUMBER_OF_MESSAGES, () -> 
simpleManagement.getMessageCountOnQueue(QUEUE_NAME), 10_000, 500);
+         Assert.assertEquals("There were errors in the consumers", 0, 
errors.get());
+      }
+   }
+
+}
diff --git 
a/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/clientFailure/ClientFailureSoakTestClient.java
 
b/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/clientFailure/ClientFailureSoakTestClient.java
new file mode 100644
index 0000000000..3ed8ebe43d
--- /dev/null
+++ 
b/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/clientFailure/ClientFailureSoakTestClient.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.tests.soak.clientFailure;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.MessageConsumer;
+import javax.jms.Session;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.RedeliveryPolicy;
+import org.apache.activemq.artemis.tests.util.CFUtil;
+import org.apache.activemq.artemis.tests.util.RandomUtil;
+
+public class ClientFailureSoakTestClient {
+
+   static void exitVM(int code) {
+      // flipping a coin between halt and exit
+      if (RandomUtil.randomBoolean()) {
+         System.out.println("returning halt " + code);
+         Runtime.getRuntime().halt(code);
+      } else {
+         System.out.println("returning exit " + code);
+         System.exit(code);
+      }
+   }
+
+   public static final int RETURN_OK = 1;
+   public static final int RETURN_ERROR = 2;
+
+   public static void main(String[] arg) {
+      String protocol = arg[0];
+      int numberOfThreads = Integer.parseInt(arg[1]);
+      int numberOfConsumers = Integer.parseInt(arg[2]);
+      String queueName = arg[3];
+
+      try {
+         ConnectionFactory cf = CFUtil.createConnectionFactory(protocol, 
"tcp://localhost:61616");
+         if (protocol.equals("OPENWIRE")) {
+            RedeliveryPolicy inifinitePolicy = new RedeliveryPolicy();
+            inifinitePolicy.setMaximumRedeliveries(-1);
+            
((ActiveMQConnectionFactory)cf).setRedeliveryPolicy(inifinitePolicy);
+         }
+         ExecutorService service = 
Executors.newFixedThreadPool(numberOfThreads);
+
+         CyclicBarrier consumersCreated = new CyclicBarrier(numberOfThreads + 
1);
+
+         for (int i = 0; i < numberOfThreads; i++) {
+
+            int threadID = i;
+            service.execute(() -> {
+               try {
+                  boolean tx = RandomUtil.randomBoolean(); // flip a coin if 
we are using TX or Auto-ACK
+                  Connection connection = cf.createConnection();
+                  connection.start();
+
+                  Session session = tx ? connection.createSession(true, 
Session.SESSION_TRANSACTED) : connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+
+                  for (int consI = 0; consI < numberOfConsumers; consI++) {
+                     MessageConsumer consumer = 
session.createConsumer(session.createQueue(queueName));
+                     if (consI == 0) {
+                        System.out.println("Consumers created");
+                        Session anotherSession = 
connection.createSession(true, Session.SESSION_TRANSACTED);
+                        MessageConsumer anotherConsumer = 
anotherSession.createConsumer(session.createQueue(queueName));
+                        consumersCreated.await(30, TimeUnit.SECONDS);
+                     }
+
+                     if (tx) {
+                        if (consumer.receiveNoWait() == null) {
+                           System.out.println("Nothing received");
+                        } else {
+                           System.out.println("received");
+                        }
+                     }
+
+                     if (RandomUtil.randomBoolean()) {
+                        connection.close();
+                        break;
+                     }
+                  }
+               } catch (Throwable e) {
+                  System.out.println("FAILURE:" + e.getMessage());
+                  e.printStackTrace(System.out);
+                  exitVM(RETURN_ERROR);
+               }
+            });
+
+         }
+
+         System.out.println("Sleeping");
+         consumersCreated.await(30, TimeUnit.SECONDS);
+         System.out.println("Done");
+
+         exitVM(RETURN_OK);
+      } catch (Throwable e) {
+         System.out.println("FAILURE:" + e.getMessage());
+         e.printStackTrace(System.out);
+         exitVM(RETURN_ERROR);
+      }
+   }
+
+}
diff --git a/tests/soak-tests/src/test/scripts/parameters.sh 
b/tests/soak-tests/src/test/scripts/parameters.sh
index 2c85432d68..568b108815 100755
--- a/tests/soak-tests/src/test/scripts/parameters.sh
+++ b/tests/soak-tests/src/test/scripts/parameters.sh
@@ -105,3 +105,34 @@ export TEST_PGDB_DB_LIST=derby
 export TEST_PGDB_MAX_MESSAGES=500
 export TEST_PGDB_MESSAGE_SIZE=100
 export TEST_PGDB_COMMIT_INTERVAL=50
+
+#ClientFailureSoakTest
+export TEST_CLIENT_FAILURE_TEST_ENABLED=true
+export TEST_CLIENT_FAILURE_PROTOCOL_LIST=AMQP,CORE,OPENWIRE
+
+export TEST_CLIENT_FAILURE_AMQP_USE_LARGE_MESSAGE=FALSE
+export TEST_CLIENT_FAILURE_AMQP_THREADS_PER_VM=20
+export TEST_CLIENT_FAILURE_AMQP_CLIENT_CONSUMERS_PER_THREAD=20
+export TEST_CLIENT_FAILURE_AMQP_TEST_REPEATS=1
+export TEST_CLIENT_FAILURE_AMQP_TOTAL_ITERATION=2
+export TEST_CLIENT_FAILURE_AMQP_NUMBER_OF_VMS=5
+export TEST_CLIENT_FAILURE_AMQP_NUMBER_OF_MESSAGES=20000
+export TEST_CLIENT_FAILURE_AMQP_MEMORY_CLIENT=-Xmx256m
+
+export TEST_CLIENT_FAILURE_CORE_USE_LARGE_MESSAGE=FALSE
+export TEST_CLIENT_FAILURE_CORE_THREADS_PER_VM=20
+export TEST_CLIENT_FAILURE_CORE_CLIENT_CONSUMERS_PER_THREAD=20
+export TEST_CLIENT_FAILURE_CORE_TEST_REPEATS=1
+export TEST_CLIENT_FAILURE_CORE_TOTAL_ITERATION=2
+export TEST_CLIENT_FAILURE_CORE_NUMBER_OF_VMS=5
+export TEST_CLIENT_FAILURE_CORE_NUMBER_OF_MESSAGES=20000
+export TEST_CLIENT_FAILURE_CORE_MEMORY_CLIENT=-Xmx256m
+
+export TEST_CLIENT_FAILURE_OPENWIRE_USE_LARGE_MESSAGE=FALSE
+export TEST_CLIENT_FAILURE_OPENWIRE_THREADS_PER_VM=20
+export TEST_CLIENT_FAILURE_OPENWIRE_CLIENT_CONSUMERS_PER_THREAD=20
+export TEST_CLIENT_FAILURE_OPENWIRE_TEST_REPEATS=1
+export TEST_CLIENT_FAILURE_OPENWIRE_TOTAL_ITERATION=2
+export TEST_CLIENT_FAILURE_OPENWIRE_NUMBER_OF_VMS=5
+export TEST_CLIENT_FAILURE_OPENWIRE_NUMBER_OF_MESSAGES=20000
+export TEST_CLIENT_FAILURE_OPENWIRE_MEMORY_CLIENT=-Xmx256m

Reply via email to