This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git

commit c7ec3c7c7716167ba8b7e8619b0cd1afd5d64bbc
Author: Clebert Suconic <[email protected]>
AuthorDate: Fri Dec 13 12:13:24 2024 -0500

    ARTEMIS-5173 Removing test not well designed
    
    This test has a variable number of messages sent. it would need to be 
rewrittent and improved.
    Best to just let it go.
---
 .../distribution/ClusteredGroupingTest.java        | 251 ---------------------
 1 file changed, 251 deletions(-)

diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/ClusteredGroupingTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/ClusteredGroupingTest.java
index 66e20083ee..83d2ab5d9b 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/ClusteredGroupingTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/ClusteredGroupingTest.java
@@ -17,24 +17,13 @@
 package org.apache.activemq.artemis.tests.integration.cluster.distribution;
 
 import java.lang.invoke.MethodHandles;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.UUID;
 import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
-import org.apache.activemq.artemis.api.core.ActiveMQException;
 import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.api.core.SimpleString;
-import org.apache.activemq.artemis.api.core.client.ClientConsumer;
 import org.apache.activemq.artemis.api.core.client.ClientMessage;
-import org.apache.activemq.artemis.api.core.client.ClientProducer;
-import org.apache.activemq.artemis.api.core.client.ClientSession;
-import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
 import org.apache.activemq.artemis.api.core.management.CoreNotificationType;
 import org.apache.activemq.artemis.core.postoffice.impl.BindingsImpl;
 import 
org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
@@ -46,9 +35,6 @@ import 
org.apache.activemq.artemis.core.server.group.impl.Proposal;
 import org.apache.activemq.artemis.core.server.group.impl.Response;
 import org.apache.activemq.artemis.core.server.impl.QueueImpl;
 import org.apache.activemq.artemis.core.server.management.Notification;
-import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
-import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
-import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
 import org.junit.jupiter.api.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -523,243 +509,6 @@ public class ClusteredGroupingTest extends 
ClusterTestBase {
       
assertNull(servers[0].getGroupingHandler().getProposal(SimpleString.of("id1.queue0"),
 false), "Group should have timed out");
    }
 
-   @Test
-   public void testGroupingWith3Nodes() throws Exception {
-      final String ADDRESS = "queues.testaddress";
-      final String QUEUE = "queue0";
-
-      setupServer(0, isFileStorage(), isNetty());
-      setupServer(1, isFileStorage(), isNetty());
-      setupServer(2, isFileStorage(), isNetty());
-
-      setupClusterConnection("cluster0", "queues", 
MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 0, 1, 2);
-      setupClusterConnection("cluster1", "queues", 
MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 1, 0, 2);
-      setupClusterConnection("cluster2", "queues", 
MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 2, 0, 1);
-
-      setUpGroupHandler(GroupingHandlerConfiguration.TYPE.LOCAL, 0, 10000, 
500, 750);
-      setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 1, 10000, 
500, 750);
-      setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 2, 10000, 
500, 750);
-
-      startServers(0, 1, 2);
-
-      AddressSettings addressSettings = new AddressSettings();
-      
addressSettings.setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE);
-      servers[0].getAddressSettingsRepository().addMatch("#", addressSettings);
-      servers[1].getAddressSettingsRepository().addMatch("#", addressSettings);
-      servers[2].getAddressSettingsRepository().addMatch("#", addressSettings);
-
-      setupSessionFactory(0, isNetty());
-
-      // need to set up reconnect attempts on this session factory because the 
test will restart node 1
-      setupSessionFactory(1, isNetty(), 15);
-
-      setupSessionFactory(2, isNetty());
-
-      createQueue(0, ADDRESS, QUEUE, null, true);
-      createQueue(1, ADDRESS, QUEUE, null, true);
-      createQueue(2, ADDRESS, QUEUE, null, true);
-
-      waitForBindings(0, ADDRESS, 1, 0, true);
-      waitForBindings(1, ADDRESS, 1, 0, true);
-      waitForBindings(2, ADDRESS, 1, 0, true);
-
-      waitForBindings(0, ADDRESS, 2, 0, false);
-      waitForBindings(1, ADDRESS, 2, 0, false);
-      waitForBindings(2, ADDRESS, 2, 0, false);
-
-      final ClientSessionFactory sf0 = sfs[0];
-      final ClientSessionFactory sf1 = sfs[1];
-      final ClientSessionFactory sf2 = sfs[2];
-
-      final ClientSession session = addClientSession(sf1.createSession(false, 
false, false));
-      final ClientProducer producer = 
addClientProducer(session.createProducer(ADDRESS));
-      List<String> groups = new ArrayList<>();
-
-      final AtomicInteger totalMessageProduced = new AtomicInteger(0);
-
-      // create a bunch of groups and save a few group IDs for use later
-      for (int i = 0; i < 500; i++) {
-         ClientMessage message = session.createMessage(true);
-         String group = UUID.randomUUID().toString();
-         message.putStringProperty(Message.HDR_GROUP_ID, 
SimpleString.of(group));
-         SimpleString dupID = SimpleString.of(UUID.randomUUID().toString());
-         message.putStringProperty(Message.HDR_DUPLICATE_DETECTION_ID, dupID);
-         if (i % 100 == 0) {
-            groups.add(group);
-         }
-         producer.send(message);
-         logger.trace("Sent message to server 1 with dupID: {}", dupID);
-      }
-
-      session.commit();
-      totalMessageProduced.addAndGet(500);
-      logger.trace("Sent block of 500 messages to server 1. Total sent: {}", 
totalMessageProduced.get());
-      session.close();
-
-      // need thread pool to service both consumers and producers plus a 
thread to cycle nodes
-      ExecutorService executorService = 
Executors.newFixedThreadPool(groups.size() * 2 + 1, 
ActiveMQThreadFactory.defaultThreadFactory(getClass().getName()));
-
-      final AtomicInteger producerCounter = new AtomicInteger(0);
-      final CountDownLatch okToConsume = new CountDownLatch(groups.size() + 1);
-
-      final AtomicInteger errors = new AtomicInteger(0);
-
-      final long timeToRun = System.currentTimeMillis() + 5000;
-
-      // spin up a bunch of threads to pump messages into some of the groups
-      for (final String groupx : groups) {
-         final Runnable r = () -> {
-
-            String group = groupx;
-
-            String basicID = UUID.randomUUID().toString();
-            logger.debug("Starting producer thread...");
-            ClientSessionFactory factory;
-            ClientSession session12 = null;
-            ClientProducer producer1 = null;
-            int targetServer = 0;
-
-            try {
-
-               int count = producerCounter.incrementAndGet();
-               if (count % 3 == 0) {
-                  factory = sf2;
-                  targetServer = 2;
-               } else if (count % 2 == 0) {
-                  factory = sf1;
-                  targetServer = 1;
-               } else {
-                  factory = sf0;
-               }
-               logger.debug("Creating producer session factory to node {}", 
targetServer);
-               session12 = addClientSession(factory.createSession(false, true, 
true));
-               producer1 = 
addClientProducer(session12.createProducer(ADDRESS));
-            } catch (Exception e) {
-               errors.incrementAndGet();
-               logger.warn("Producer thread couldn't establish connection", e);
-               return;
-            }
-
-            int messageCount = 0;
-
-            while (timeToRun > System.currentTimeMillis()) {
-               ClientMessage message = session12.createMessage(true);
-               message.putStringProperty(Message.HDR_GROUP_ID, 
SimpleString.of(group));
-               SimpleString dupID = SimpleString.of(basicID + ":" + 
messageCount);
-               message.putStringProperty(Message.HDR_DUPLICATE_DETECTION_ID, 
dupID);
-               try {
-                  producer1.send(message);
-                  totalMessageProduced.incrementAndGet();
-                  messageCount++;
-               } catch (ActiveMQException e) {
-                  logger.warn("Producer thread threw exception while sending 
messages to {}: {}", targetServer, e.getMessage());
-                  // in case of a failure we change the group to make possible 
errors more likely
-                  group = group + "afterFail";
-               } catch (Exception e) {
-                  logger.warn("Producer thread threw unexpected exception 
while sending messages to {}: {}", targetServer, e.getMessage());
-                  group = group + "afterFail";
-                  break;
-               }
-            }
-
-            okToConsume.countDown();
-         };
-
-         executorService.execute(r);
-      }
-
-      Runnable r = () -> {
-         try {
-            try {
-               Thread.sleep(2000);
-            } catch (InterruptedException e) {
-               e.printStackTrace();
-               // ignore
-            }
-            cycleServer(1);
-         } finally {
-            okToConsume.countDown();
-         }
-      };
-
-      executorService.execute(r);
-
-      final AtomicInteger consumerCounter = new AtomicInteger(0);
-      final AtomicInteger totalMessagesConsumed = new AtomicInteger(0);
-      final CountDownLatch okToEndTest = new CountDownLatch(groups.size());
-
-      // spin up a bunch of threads to consume messages
-      for (final String group : groups) {
-         r = () -> {
-            try {
-               logger.debug("Waiting to start consumer thread...");
-               okToConsume.await(20, TimeUnit.SECONDS);
-            } catch (InterruptedException e) {
-               e.printStackTrace();
-               return;
-            }
-            logger.debug("Starting consumer thread...");
-            ClientSessionFactory factory;
-            ClientSession session1 = null;
-            ClientConsumer consumer = null;
-            int targetServer = 0;
-
-            try {
-               synchronized (consumerCounter) {
-                  if (consumerCounter.get() % 3 == 0) {
-                     factory = sf2;
-                     targetServer = 2;
-                  } else if (consumerCounter.get() % 2 == 0) {
-                     factory = sf1;
-                     targetServer = 1;
-                  } else {
-                     factory = sf0;
-                  }
-                  logger.debug("Creating consumer session factory to node {}", 
targetServer);
-                  session1 = addClientSession(factory.createSession(false, 
false, true));
-                  consumer = addClientConsumer(session1.createConsumer(QUEUE));
-                  session1.start();
-                  consumerCounter.incrementAndGet();
-               }
-            } catch (Exception e) {
-               logger.debug("Consumer thread couldn't establish connection", 
e);
-               errors.incrementAndGet();
-               return;
-            }
-
-            while (true) {
-               try {
-                  ClientMessage m = consumer.receive(1000);
-                  if (m == null) {
-                     okToEndTest.countDown();
-                     return;
-                  }
-                  m.acknowledge();
-                  logger.trace("Consumed message {} from server {}. Total 
consumed: {}", m.getStringProperty(Message.HDR_DUPLICATE_DETECTION_ID), 
targetServer, totalMessagesConsumed.incrementAndGet());
-               } catch (ActiveMQException e) {
-                  errors.incrementAndGet();
-                  logger.warn("Consumer thread threw exception while receiving 
messages from server {}.: {}", targetServer, e.getMessage());
-               } catch (Exception e) {
-                  errors.incrementAndGet();
-                  logger.warn("Consumer thread threw unexpected exception 
while receiving messages from server {}.: {}", targetServer, e.getMessage());
-                  return;
-               }
-            }
-         };
-
-         executorService.execute(r);
-      }
-      // wait for the threads to complete their consuming
-      okToEndTest.await(20, TimeUnit.SECONDS);
-
-      executorService.shutdownNow();
-      executorService.awaitTermination(10, TimeUnit.SECONDS);
-
-      assertEquals(0, errors.get());
-
-      assertEquals(totalMessageProduced.longValue(), 
totalMessagesConsumed.longValue());
-   }
-
    private void cycleServer(int node) {
       try {
          stopServers(node);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact


Reply via email to