This is an automated email from the ASF dual-hosted git repository. clebertsuconic pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
commit ab6f0a303292f68341f7ea400df813dbc4f57a2d Author: Justin Bertram <[email protected]> AuthorDate: Fri Oct 6 11:25:29 2023 -0500 ARTEMIS-4453 Lots of addresses breaks cluster bridge flow control --- .../client/impl/AbstractProducerCreditsImpl.java | 1 + .../impl/AsynchronousProducerCreditsImpl.java | 13 ---- .../impl/ClientProducerCreditManagerImpl.java | 60 ++++++++++------ .../core/client/impl/ClientProducerCredits.java | 4 +- .../client/impl/ClientProducerCreditsImpl.java | 6 -- .../core/client/impl/ClientSessionFactoryImpl.java | 2 +- .../core/client/impl/ClientSessionImpl.java | 6 +- .../distribution/SimpleSymmetricClusterTest.java | 81 ++++++++++++++++++++++ 8 files changed, 128 insertions(+), 45 deletions(-) diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/AbstractProducerCreditsImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/AbstractProducerCreditsImpl.java index 2e341fa9b3..a2fe15ffe2 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/AbstractProducerCreditsImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/AbstractProducerCreditsImpl.java @@ -149,6 +149,7 @@ public abstract class AbstractProducerCreditsImpl implements ClientProducerCredi return --refCount; } + @Override public abstract int getBalance(); protected void checkCredits(final int credits) { diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/AsynchronousProducerCreditsImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/AsynchronousProducerCreditsImpl.java index a6a61d38a1..fecfe5904f 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/AsynchronousProducerCreditsImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/AsynchronousProducerCreditsImpl.java @@ -76,7 +76,6 @@ public class AsynchronousProducerCreditsImpl extends AbstractProducerCreditsImpl } - @Override public void receiveFailCredits(final int credits) { super.receiveFailCredits(credits); @@ -85,16 +84,4 @@ public class AsynchronousProducerCreditsImpl extends AbstractProducerCreditsImpl } callback.onCreditsFail(this); } - - @Override - public void releaseOutstanding() { - synchronized (this) { - balance = 0; - callback.onCreditsFlow(true, this); - if (logger.isDebugEnabled()) { - logger.debug("releaseOutstanding credits, balance={}, callback={}", balance, callback.getClass()); - } - } - - } } diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerCreditManagerImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerCreditManagerImpl.java index cd2db9cc2c..c71082be5a 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerCreditManagerImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerCreditManagerImpl.java @@ -19,6 +19,9 @@ package org.apache.activemq.artemis.core.client.impl; import java.util.Iterator; import java.util.LinkedHashMap; import java.util.Map; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.spi.core.remoting.SessionContext; @@ -37,10 +40,16 @@ public class ClientProducerCreditManagerImpl implements ClientProducerCreditMana private ClientProducerFlowCallback callback; - public ClientProducerCreditManagerImpl(final ClientSessionInternal session, final int windowSize) { + private final ScheduledExecutorService scheduledThreadPool; + + private ScheduledFuture future; + + public ClientProducerCreditManagerImpl(final ClientSessionInternal session, final int windowSize, ScheduledExecutorService scheduledThreadPool) { this.session = session; this.windowSize = windowSize; + + this.scheduledThreadPool = scheduledThreadPool; } @@ -146,6 +155,10 @@ public class ClientProducerCreditManagerImpl implements ClientProducerCreditMana producerCredits.clear(); unReferencedCredits.clear(); + + if (future != null) { + future.cancel(false); + } } @Override @@ -162,26 +175,30 @@ public class ClientProducerCreditManagerImpl implements ClientProducerCreditMana unReferencedCredits.put(address, credits); if (unReferencedCredits.size() > MAX_UNREFERENCED_CREDITS_CACHE_SIZE) { - // Remove the oldest entry - - Iterator<Map.Entry<SimpleString, ClientProducerCredits>> iter = unReferencedCredits.entrySet().iterator(); - - Map.Entry<SimpleString, ClientProducerCredits> oldest = iter.next(); - - iter.remove(); - - removeEntry(oldest.getKey(), oldest.getValue()); + // if we've exceeded our limit then try to clean up + if (future == null) { + future = scheduledThreadPool.scheduleWithFixedDelay(() -> { + synchronized (this) { + Iterator<Map.Entry<SimpleString, ClientProducerCredits>> iter = unReferencedCredits.entrySet().iterator(); + while (iter.hasNext()) { + Map.Entry<SimpleString, ClientProducerCredits> entry = iter.next(); + if (entry.getValue().getBalance() == 0) { + iter.remove(); + producerCredits.remove(entry.getKey()); + entry.getValue().close(); + } + } + } + }, 0, 30, TimeUnit.SECONDS); + } + } else { + // if we're below our limit make sure we're not trying to clean up + if (future != null) { + future.cancel(false); + } } } - private void removeEntry(final SimpleString address, final ClientProducerCredits credits) { - producerCredits.remove(address); - - credits.releaseOutstanding(); - - credits.close(); - } - static class ClientProducerCreditsNoFlowControl implements ClientProducerCredits { static ClientProducerCreditsNoFlowControl instance = new ClientProducerCreditsNoFlowControl(); @@ -225,12 +242,13 @@ public class ClientProducerCreditManagerImpl implements ClientProducerCreditMana } @Override - public void releaseOutstanding() { + public SimpleString getAddress() { + return SimpleString.toSimpleString(""); } @Override - public SimpleString getAddress() { - return SimpleString.toSimpleString(""); + public int getBalance() { + return 0; } } diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerCredits.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerCredits.java index 321fda5e89..78c37fc0d8 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerCredits.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerCredits.java @@ -40,7 +40,7 @@ public interface ClientProducerCredits { int decrementRefCount(); - void releaseOutstanding(); - SimpleString getAddress(); + + int getBalance(); } diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerCreditsImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerCreditsImpl.java index 41a08c9d4d..e4ce6a843f 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerCreditsImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerCreditsImpl.java @@ -112,12 +112,6 @@ public class ClientProducerCreditsImpl extends AbstractProducerCreditsImpl { semaphore.release(credits); } - - @Override - public synchronized void releaseOutstanding() { - semaphore.drainPermits(); - } - @Override public int getBalance() { return semaphore.availablePermits(); diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java index 86d0b9465d..e4968245d6 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java @@ -836,7 +836,7 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C SessionContext context = createSessionChannel(name, username, password, xa, autoCommitSends, autoCommitAcks, preAcknowledge, clientID); - ClientSessionInternal session = new ClientSessionImpl(this, name, username, password, xa, autoCommitSends, autoCommitAcks, preAcknowledge, serverLocator.isBlockOnAcknowledge(), serverLocator.isAutoGroup(), ackBatchSize, serverLocator.getConsumerWindowSize(), serverLocator.getConsumerMaxRate(), serverLocator.getConfirmationWindowSize(), serverLocator.getProducerWindowSize(), serverLocator.getProducerMaxRate(), serverLocator.isBlockOnNonDurableSend(), serverLocator.isBlockOnDurableSe [...] + ClientSessionInternal session = new ClientSessionImpl(this, name, username, password, xa, autoCommitSends, autoCommitAcks, preAcknowledge, serverLocator.isBlockOnAcknowledge(), serverLocator.isAutoGroup(), ackBatchSize, serverLocator.getConsumerWindowSize(), serverLocator.getConsumerMaxRate(), serverLocator.getConfirmationWindowSize(), serverLocator.getProducerWindowSize(), serverLocator.getProducerMaxRate(), serverLocator.isBlockOnNonDurableSend(), serverLocator.isBlockOnDurableSe [...] synchronized (sessions) { if (closed || !clientProtocolManager.isAlive()) { diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java index 54a2b86cbb..2a825b6d6b 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java @@ -26,6 +26,7 @@ import java.util.LinkedHashMap; import java.util.Map; import java.util.Set; import java.util.concurrent.Executor; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.atomic.AtomicInteger; import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration; @@ -193,7 +194,8 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi final Executor executor, final Executor confirmationExecutor, final Executor flowControlExecutor, - final Executor closeExecutor) throws ActiveMQException { + final Executor closeExecutor, + final ScheduledExecutorService scheduledThreadPool) throws ActiveMQException { this.sessionFactory = sessionFactory; this.name = name; @@ -246,7 +248,7 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi this.groupID = groupID; - producerCreditManager = new ClientProducerCreditManagerImpl(this, producerWindowSize); + producerCreditManager = new ClientProducerCreditManagerImpl(this, producerWindowSize, scheduledThreadPool); this.sessionContext = sessionContext; diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/SimpleSymmetricClusterTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/SimpleSymmetricClusterTest.java index 8f53f2b424..b647ef6a7b 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/SimpleSymmetricClusterTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/SimpleSymmetricClusterTest.java @@ -20,7 +20,12 @@ import java.util.ArrayList; import java.util.Collection; import java.util.List; +import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.api.core.client.ClientMessage; +import org.apache.activemq.artemis.api.core.client.ClientProducer; +import org.apache.activemq.artemis.api.core.client.ClientSession; +import org.apache.activemq.artemis.api.core.client.ClientSessionFactory; import org.apache.activemq.artemis.core.postoffice.Binding; import org.apache.activemq.artemis.core.server.cluster.ClusterConnection; import org.apache.activemq.artemis.core.server.cluster.MessageFlowRecord; @@ -34,6 +39,10 @@ import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.lang.invoke.MethodHandles; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; public class SimpleSymmetricClusterTest extends ClusterTestBase { @@ -127,6 +136,78 @@ public class SimpleSymmetricClusterTest extends ClusterTestBase { } + @Test + public void testWildcardFlowControl() throws Exception { + final int PRODUCER_COUNT = 3000; + final int ITERATIONS = 4; + final CountDownLatch consumerLatch = new CountDownLatch(PRODUCER_COUNT * ITERATIONS); + final CountDownLatch producerLatch = new CountDownLatch(PRODUCER_COUNT * ITERATIONS); + + setupServer(0, true, isNetty()); + setupServer(1, true, isNetty()); + servers[0].getConfiguration().setWildcardRoutingEnabled(true); + servers[1].getConfiguration().setWildcardRoutingEnabled(true); + + setupClusterConnection("cluster0", "queues", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 0, 1); + setupClusterConnection("cluster1", "queues", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 1, 0); + + startServers(0, 1); + + waitForTopology(servers[0], 2); + waitForTopology(servers[1], 2); + + setupSessionFactory(0, isNetty()); + setupSessionFactory(1, isNetty()); + + logger.info("Creating " + PRODUCER_COUNT + " multicast addresses on node 1..."); + for (int i = 0; i < PRODUCER_COUNT; i++) { + createAddressInfo(1, "queues." + i, RoutingType.MULTICAST, -1, false); + } + logger.info("Addresses created."); + + createQueue(0, "queues.#", "queue", null, false, RoutingType.MULTICAST); + + logger.info("Creating consumer on node 0"); + addConsumer(0, 0, "queue", null); + + consumers[0].getConsumer().setMessageHandler(message -> { + logger.debug("Received: " + message); + consumerLatch.countDown(); + }); + + waitForBindings(0, "queues.#", 1, 1, true); + + logger.info("Creating " + PRODUCER_COUNT + " producers..."); + ClientProducer[] producers = new ClientProducer[PRODUCER_COUNT]; + ClientSession[] sessions = new ClientSession[PRODUCER_COUNT]; + for (int i = 0; i < PRODUCER_COUNT; i++) { + ClientSessionFactory sf = sfs[1]; + sessions[i] = addClientSession(sf.createSession(true, true, 0)); + producers[i] = addClientProducer(sessions[i].createProducer("queues." + i)); + } + + ExecutorService executorService = Executors.newFixedThreadPool(PRODUCER_COUNT); + runAfter(executorService::shutdownNow); + for (int i = 0; i < PRODUCER_COUNT; i++) { + final ClientProducer producer = producers[i]; + final ClientMessage message = sessions[i].createMessage(true); + executorService.submit(() -> { + for (int j = 0; j < ITERATIONS; j++) { + try { + producer.send(message); + producerLatch.countDown(); + logger.debug("Sent message"); + } catch (Exception e) { + logger.error(e.getMessage(), e); + } + } + }); + } + producerLatch.await(30, TimeUnit.SECONDS); + logger.info("Waiting for messages on node 0"); + assertTrue(consumerLatch.await(30, TimeUnit.SECONDS)); + } + @Test public void testSimpleRestartClusterConnection() throws Exception { setupServer(0, true, isNetty());
