This is an automated email from the ASF dual-hosted git repository. clebertsuconic pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
commit c7ec3c7c7716167ba8b7e8619b0cd1afd5d64bbc Author: Clebert Suconic <[email protected]> AuthorDate: Fri Dec 13 12:13:24 2024 -0500 ARTEMIS-5173 Removing test not well designed This test has a variable number of messages sent. it would need to be rewrittent and improved. Best to just let it go. --- .../distribution/ClusteredGroupingTest.java | 251 --------------------- 1 file changed, 251 deletions(-) diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/ClusteredGroupingTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/ClusteredGroupingTest.java index 66e20083ee..83d2ab5d9b 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/ClusteredGroupingTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/ClusteredGroupingTest.java @@ -17,24 +17,13 @@ package org.apache.activemq.artemis.tests.integration.cluster.distribution; import java.lang.invoke.MethodHandles; -import java.util.ArrayList; -import java.util.List; -import java.util.UUID; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration; -import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.SimpleString; -import org.apache.activemq.artemis.api.core.client.ClientConsumer; import org.apache.activemq.artemis.api.core.client.ClientMessage; -import org.apache.activemq.artemis.api.core.client.ClientProducer; -import org.apache.activemq.artemis.api.core.client.ClientSession; -import org.apache.activemq.artemis.api.core.client.ClientSessionFactory; import org.apache.activemq.artemis.api.core.management.CoreNotificationType; import org.apache.activemq.artemis.core.postoffice.impl.BindingsImpl; import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType; @@ -46,9 +35,6 @@ import org.apache.activemq.artemis.core.server.group.impl.Proposal; import org.apache.activemq.artemis.core.server.group.impl.Response; import org.apache.activemq.artemis.core.server.impl.QueueImpl; import org.apache.activemq.artemis.core.server.management.Notification; -import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy; -import org.apache.activemq.artemis.core.settings.impl.AddressSettings; -import org.apache.activemq.artemis.utils.ActiveMQThreadFactory; import org.junit.jupiter.api.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -523,243 +509,6 @@ public class ClusteredGroupingTest extends ClusterTestBase { assertNull(servers[0].getGroupingHandler().getProposal(SimpleString.of("id1.queue0"), false), "Group should have timed out"); } - @Test - public void testGroupingWith3Nodes() throws Exception { - final String ADDRESS = "queues.testaddress"; - final String QUEUE = "queue0"; - - setupServer(0, isFileStorage(), isNetty()); - setupServer(1, isFileStorage(), isNetty()); - setupServer(2, isFileStorage(), isNetty()); - - setupClusterConnection("cluster0", "queues", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 0, 1, 2); - setupClusterConnection("cluster1", "queues", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 1, 0, 2); - setupClusterConnection("cluster2", "queues", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 2, 0, 1); - - setUpGroupHandler(GroupingHandlerConfiguration.TYPE.LOCAL, 0, 10000, 500, 750); - setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 1, 10000, 500, 750); - setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 2, 10000, 500, 750); - - startServers(0, 1, 2); - - AddressSettings addressSettings = new AddressSettings(); - addressSettings.setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE); - servers[0].getAddressSettingsRepository().addMatch("#", addressSettings); - servers[1].getAddressSettingsRepository().addMatch("#", addressSettings); - servers[2].getAddressSettingsRepository().addMatch("#", addressSettings); - - setupSessionFactory(0, isNetty()); - - // need to set up reconnect attempts on this session factory because the test will restart node 1 - setupSessionFactory(1, isNetty(), 15); - - setupSessionFactory(2, isNetty()); - - createQueue(0, ADDRESS, QUEUE, null, true); - createQueue(1, ADDRESS, QUEUE, null, true); - createQueue(2, ADDRESS, QUEUE, null, true); - - waitForBindings(0, ADDRESS, 1, 0, true); - waitForBindings(1, ADDRESS, 1, 0, true); - waitForBindings(2, ADDRESS, 1, 0, true); - - waitForBindings(0, ADDRESS, 2, 0, false); - waitForBindings(1, ADDRESS, 2, 0, false); - waitForBindings(2, ADDRESS, 2, 0, false); - - final ClientSessionFactory sf0 = sfs[0]; - final ClientSessionFactory sf1 = sfs[1]; - final ClientSessionFactory sf2 = sfs[2]; - - final ClientSession session = addClientSession(sf1.createSession(false, false, false)); - final ClientProducer producer = addClientProducer(session.createProducer(ADDRESS)); - List<String> groups = new ArrayList<>(); - - final AtomicInteger totalMessageProduced = new AtomicInteger(0); - - // create a bunch of groups and save a few group IDs for use later - for (int i = 0; i < 500; i++) { - ClientMessage message = session.createMessage(true); - String group = UUID.randomUUID().toString(); - message.putStringProperty(Message.HDR_GROUP_ID, SimpleString.of(group)); - SimpleString dupID = SimpleString.of(UUID.randomUUID().toString()); - message.putStringProperty(Message.HDR_DUPLICATE_DETECTION_ID, dupID); - if (i % 100 == 0) { - groups.add(group); - } - producer.send(message); - logger.trace("Sent message to server 1 with dupID: {}", dupID); - } - - session.commit(); - totalMessageProduced.addAndGet(500); - logger.trace("Sent block of 500 messages to server 1. Total sent: {}", totalMessageProduced.get()); - session.close(); - - // need thread pool to service both consumers and producers plus a thread to cycle nodes - ExecutorService executorService = Executors.newFixedThreadPool(groups.size() * 2 + 1, ActiveMQThreadFactory.defaultThreadFactory(getClass().getName())); - - final AtomicInteger producerCounter = new AtomicInteger(0); - final CountDownLatch okToConsume = new CountDownLatch(groups.size() + 1); - - final AtomicInteger errors = new AtomicInteger(0); - - final long timeToRun = System.currentTimeMillis() + 5000; - - // spin up a bunch of threads to pump messages into some of the groups - for (final String groupx : groups) { - final Runnable r = () -> { - - String group = groupx; - - String basicID = UUID.randomUUID().toString(); - logger.debug("Starting producer thread..."); - ClientSessionFactory factory; - ClientSession session12 = null; - ClientProducer producer1 = null; - int targetServer = 0; - - try { - - int count = producerCounter.incrementAndGet(); - if (count % 3 == 0) { - factory = sf2; - targetServer = 2; - } else if (count % 2 == 0) { - factory = sf1; - targetServer = 1; - } else { - factory = sf0; - } - logger.debug("Creating producer session factory to node {}", targetServer); - session12 = addClientSession(factory.createSession(false, true, true)); - producer1 = addClientProducer(session12.createProducer(ADDRESS)); - } catch (Exception e) { - errors.incrementAndGet(); - logger.warn("Producer thread couldn't establish connection", e); - return; - } - - int messageCount = 0; - - while (timeToRun > System.currentTimeMillis()) { - ClientMessage message = session12.createMessage(true); - message.putStringProperty(Message.HDR_GROUP_ID, SimpleString.of(group)); - SimpleString dupID = SimpleString.of(basicID + ":" + messageCount); - message.putStringProperty(Message.HDR_DUPLICATE_DETECTION_ID, dupID); - try { - producer1.send(message); - totalMessageProduced.incrementAndGet(); - messageCount++; - } catch (ActiveMQException e) { - logger.warn("Producer thread threw exception while sending messages to {}: {}", targetServer, e.getMessage()); - // in case of a failure we change the group to make possible errors more likely - group = group + "afterFail"; - } catch (Exception e) { - logger.warn("Producer thread threw unexpected exception while sending messages to {}: {}", targetServer, e.getMessage()); - group = group + "afterFail"; - break; - } - } - - okToConsume.countDown(); - }; - - executorService.execute(r); - } - - Runnable r = () -> { - try { - try { - Thread.sleep(2000); - } catch (InterruptedException e) { - e.printStackTrace(); - // ignore - } - cycleServer(1); - } finally { - okToConsume.countDown(); - } - }; - - executorService.execute(r); - - final AtomicInteger consumerCounter = new AtomicInteger(0); - final AtomicInteger totalMessagesConsumed = new AtomicInteger(0); - final CountDownLatch okToEndTest = new CountDownLatch(groups.size()); - - // spin up a bunch of threads to consume messages - for (final String group : groups) { - r = () -> { - try { - logger.debug("Waiting to start consumer thread..."); - okToConsume.await(20, TimeUnit.SECONDS); - } catch (InterruptedException e) { - e.printStackTrace(); - return; - } - logger.debug("Starting consumer thread..."); - ClientSessionFactory factory; - ClientSession session1 = null; - ClientConsumer consumer = null; - int targetServer = 0; - - try { - synchronized (consumerCounter) { - if (consumerCounter.get() % 3 == 0) { - factory = sf2; - targetServer = 2; - } else if (consumerCounter.get() % 2 == 0) { - factory = sf1; - targetServer = 1; - } else { - factory = sf0; - } - logger.debug("Creating consumer session factory to node {}", targetServer); - session1 = addClientSession(factory.createSession(false, false, true)); - consumer = addClientConsumer(session1.createConsumer(QUEUE)); - session1.start(); - consumerCounter.incrementAndGet(); - } - } catch (Exception e) { - logger.debug("Consumer thread couldn't establish connection", e); - errors.incrementAndGet(); - return; - } - - while (true) { - try { - ClientMessage m = consumer.receive(1000); - if (m == null) { - okToEndTest.countDown(); - return; - } - m.acknowledge(); - logger.trace("Consumed message {} from server {}. Total consumed: {}", m.getStringProperty(Message.HDR_DUPLICATE_DETECTION_ID), targetServer, totalMessagesConsumed.incrementAndGet()); - } catch (ActiveMQException e) { - errors.incrementAndGet(); - logger.warn("Consumer thread threw exception while receiving messages from server {}.: {}", targetServer, e.getMessage()); - } catch (Exception e) { - errors.incrementAndGet(); - logger.warn("Consumer thread threw unexpected exception while receiving messages from server {}.: {}", targetServer, e.getMessage()); - return; - } - } - }; - - executorService.execute(r); - } - // wait for the threads to complete their consuming - okToEndTest.await(20, TimeUnit.SECONDS); - - executorService.shutdownNow(); - executorService.awaitTermination(10, TimeUnit.SECONDS); - - assertEquals(0, errors.get()); - - assertEquals(totalMessageProduced.longValue(), totalMessagesConsumed.longValue()); - } - private void cycleServer(int node) { try { stopServers(node); --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected] For further information, visit: https://activemq.apache.org/contact
