This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git

commit ab6f0a303292f68341f7ea400df813dbc4f57a2d
Author: Justin Bertram <[email protected]>
AuthorDate: Fri Oct 6 11:25:29 2023 -0500

    ARTEMIS-4453 Lots of addresses breaks cluster bridge flow control
---
 .../client/impl/AbstractProducerCreditsImpl.java   |  1 +
 .../impl/AsynchronousProducerCreditsImpl.java      | 13 ----
 .../impl/ClientProducerCreditManagerImpl.java      | 60 ++++++++++------
 .../core/client/impl/ClientProducerCredits.java    |  4 +-
 .../client/impl/ClientProducerCreditsImpl.java     |  6 --
 .../core/client/impl/ClientSessionFactoryImpl.java |  2 +-
 .../core/client/impl/ClientSessionImpl.java        |  6 +-
 .../distribution/SimpleSymmetricClusterTest.java   | 81 ++++++++++++++++++++++
 8 files changed, 128 insertions(+), 45 deletions(-)

diff --git 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/AbstractProducerCreditsImpl.java
 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/AbstractProducerCreditsImpl.java
index 2e341fa9b3..a2fe15ffe2 100644
--- 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/AbstractProducerCreditsImpl.java
+++ 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/AbstractProducerCreditsImpl.java
@@ -149,6 +149,7 @@ public abstract class AbstractProducerCreditsImpl 
implements ClientProducerCredi
       return --refCount;
    }
 
+   @Override
    public abstract int getBalance();
 
    protected void checkCredits(final int credits) {
diff --git 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/AsynchronousProducerCreditsImpl.java
 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/AsynchronousProducerCreditsImpl.java
index a6a61d38a1..fecfe5904f 100644
--- 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/AsynchronousProducerCreditsImpl.java
+++ 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/AsynchronousProducerCreditsImpl.java
@@ -76,7 +76,6 @@ public class AsynchronousProducerCreditsImpl extends 
AbstractProducerCreditsImpl
 
    }
 
-
    @Override
    public void receiveFailCredits(final int credits) {
       super.receiveFailCredits(credits);
@@ -85,16 +84,4 @@ public class AsynchronousProducerCreditsImpl extends 
AbstractProducerCreditsImpl
       }
       callback.onCreditsFail(this);
    }
-
-   @Override
-   public void releaseOutstanding() {
-      synchronized (this) {
-         balance = 0;
-         callback.onCreditsFlow(true, this);
-         if (logger.isDebugEnabled()) {
-            logger.debug("releaseOutstanding credits, balance={}, 
callback={}", balance, callback.getClass());
-         }
-      }
-
-   }
 }
diff --git 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerCreditManagerImpl.java
 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerCreditManagerImpl.java
index cd2db9cc2c..c71082be5a 100644
--- 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerCreditManagerImpl.java
+++ 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerCreditManagerImpl.java
@@ -19,6 +19,9 @@ package org.apache.activemq.artemis.core.client.impl;
 import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.Map;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.spi.core.remoting.SessionContext;
@@ -37,10 +40,16 @@ public class ClientProducerCreditManagerImpl implements 
ClientProducerCreditMana
 
    private ClientProducerFlowCallback callback;
 
-   public ClientProducerCreditManagerImpl(final ClientSessionInternal session, 
final int windowSize) {
+   private final ScheduledExecutorService scheduledThreadPool;
+
+   private ScheduledFuture future;
+
+   public ClientProducerCreditManagerImpl(final ClientSessionInternal session, 
final int windowSize, ScheduledExecutorService scheduledThreadPool) {
       this.session = session;
 
       this.windowSize = windowSize;
+
+      this.scheduledThreadPool = scheduledThreadPool;
    }
 
 
@@ -146,6 +155,10 @@ public class ClientProducerCreditManagerImpl implements 
ClientProducerCreditMana
       producerCredits.clear();
 
       unReferencedCredits.clear();
+
+      if (future != null) {
+         future.cancel(false);
+      }
    }
 
    @Override
@@ -162,26 +175,30 @@ public class ClientProducerCreditManagerImpl implements 
ClientProducerCreditMana
       unReferencedCredits.put(address, credits);
 
       if (unReferencedCredits.size() > MAX_UNREFERENCED_CREDITS_CACHE_SIZE) {
-         // Remove the oldest entry
-
-         Iterator<Map.Entry<SimpleString, ClientProducerCredits>> iter = 
unReferencedCredits.entrySet().iterator();
-
-         Map.Entry<SimpleString, ClientProducerCredits> oldest = iter.next();
-
-         iter.remove();
-
-         removeEntry(oldest.getKey(), oldest.getValue());
+         // if we've exceeded our limit then try to clean up
+         if (future == null) {
+            future = scheduledThreadPool.scheduleWithFixedDelay(() -> {
+               synchronized (this) {
+                  Iterator<Map.Entry<SimpleString, ClientProducerCredits>> 
iter = unReferencedCredits.entrySet().iterator();
+                  while (iter.hasNext()) {
+                     Map.Entry<SimpleString, ClientProducerCredits> entry = 
iter.next();
+                     if (entry.getValue().getBalance() == 0) {
+                        iter.remove();
+                        producerCredits.remove(entry.getKey());
+                        entry.getValue().close();
+                     }
+                  }
+               }
+            }, 0, 30, TimeUnit.SECONDS);
+         }
+      } else {
+         // if we're below our limit make sure we're not trying to clean up
+         if (future != null) {
+            future.cancel(false);
+         }
       }
    }
 
-   private void removeEntry(final SimpleString address, final 
ClientProducerCredits credits) {
-      producerCredits.remove(address);
-
-      credits.releaseOutstanding();
-
-      credits.close();
-   }
-
    static class ClientProducerCreditsNoFlowControl implements 
ClientProducerCredits {
 
       static ClientProducerCreditsNoFlowControl instance = new 
ClientProducerCreditsNoFlowControl();
@@ -225,12 +242,13 @@ public class ClientProducerCreditManagerImpl implements 
ClientProducerCreditMana
       }
 
       @Override
-      public void releaseOutstanding() {
+      public SimpleString getAddress() {
+         return SimpleString.toSimpleString("");
       }
 
       @Override
-      public SimpleString getAddress() {
-         return SimpleString.toSimpleString("");
+      public int getBalance() {
+         return 0;
       }
    }
 
diff --git 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerCredits.java
 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerCredits.java
index 321fda5e89..78c37fc0d8 100644
--- 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerCredits.java
+++ 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerCredits.java
@@ -40,7 +40,7 @@ public interface ClientProducerCredits {
 
    int decrementRefCount();
 
-   void releaseOutstanding();
-
    SimpleString getAddress();
+
+   int getBalance();
 }
diff --git 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerCreditsImpl.java
 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerCreditsImpl.java
index 41a08c9d4d..e4ce6a843f 100644
--- 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerCreditsImpl.java
+++ 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerCreditsImpl.java
@@ -112,12 +112,6 @@ public class ClientProducerCreditsImpl extends 
AbstractProducerCreditsImpl {
       semaphore.release(credits);
    }
 
-
-   @Override
-   public synchronized void releaseOutstanding() {
-      semaphore.drainPermits();
-   }
-
    @Override
    public int getBalance() {
       return semaphore.availablePermits();
diff --git 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java
 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java
index 86d0b9465d..e4968245d6 100644
--- 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java
+++ 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java
@@ -836,7 +836,7 @@ public class ClientSessionFactoryImpl implements 
ClientSessionFactoryInternal, C
 
       SessionContext context = createSessionChannel(name, username, password, 
xa, autoCommitSends, autoCommitAcks, preAcknowledge, clientID);
 
-      ClientSessionInternal session = new ClientSessionImpl(this, name, 
username, password, xa, autoCommitSends, autoCommitAcks, preAcknowledge, 
serverLocator.isBlockOnAcknowledge(), serverLocator.isAutoGroup(), 
ackBatchSize, serverLocator.getConsumerWindowSize(), 
serverLocator.getConsumerMaxRate(), serverLocator.getConfirmationWindowSize(), 
serverLocator.getProducerWindowSize(), serverLocator.getProducerMaxRate(), 
serverLocator.isBlockOnNonDurableSend(), serverLocator.isBlockOnDurableSe [...]
+      ClientSessionInternal session = new ClientSessionImpl(this, name, 
username, password, xa, autoCommitSends, autoCommitAcks, preAcknowledge, 
serverLocator.isBlockOnAcknowledge(), serverLocator.isAutoGroup(), 
ackBatchSize, serverLocator.getConsumerWindowSize(), 
serverLocator.getConsumerMaxRate(), serverLocator.getConfirmationWindowSize(), 
serverLocator.getProducerWindowSize(), serverLocator.getProducerMaxRate(), 
serverLocator.isBlockOnNonDurableSend(), serverLocator.isBlockOnDurableSe [...]
 
       synchronized (sessions) {
          if (closed || !clientProtocolManager.isAlive()) {
diff --git 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java
 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java
index 54a2b86cbb..2a825b6d6b 100644
--- 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java
+++ 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java
@@ -26,6 +26,7 @@ import java.util.LinkedHashMap;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.Executor;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
@@ -193,7 +194,8 @@ public final class ClientSessionImpl implements 
ClientSessionInternal, FailureLi
                      final Executor executor,
                      final Executor confirmationExecutor,
                      final Executor flowControlExecutor,
-                     final Executor closeExecutor) throws ActiveMQException {
+                     final Executor closeExecutor,
+                     final ScheduledExecutorService scheduledThreadPool) 
throws ActiveMQException {
       this.sessionFactory = sessionFactory;
 
       this.name = name;
@@ -246,7 +248,7 @@ public final class ClientSessionImpl implements 
ClientSessionInternal, FailureLi
 
       this.groupID = groupID;
 
-      producerCreditManager = new ClientProducerCreditManagerImpl(this, 
producerWindowSize);
+      producerCreditManager = new ClientProducerCreditManagerImpl(this, 
producerWindowSize, scheduledThreadPool);
 
       this.sessionContext = sessionContext;
 
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/SimpleSymmetricClusterTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/SimpleSymmetricClusterTest.java
index 8f53f2b424..b647ef6a7b 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/SimpleSymmetricClusterTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/SimpleSymmetricClusterTest.java
@@ -20,7 +20,12 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
 
+import org.apache.activemq.artemis.api.core.RoutingType;
 import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.api.core.client.ClientMessage;
+import org.apache.activemq.artemis.api.core.client.ClientProducer;
+import org.apache.activemq.artemis.api.core.client.ClientSession;
+import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
 import org.apache.activemq.artemis.core.postoffice.Binding;
 import org.apache.activemq.artemis.core.server.cluster.ClusterConnection;
 import org.apache.activemq.artemis.core.server.cluster.MessageFlowRecord;
@@ -34,6 +39,10 @@ import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import java.lang.invoke.MethodHandles;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
 
 public class SimpleSymmetricClusterTest extends ClusterTestBase {
 
@@ -127,6 +136,78 @@ public class SimpleSymmetricClusterTest extends 
ClusterTestBase {
 
    }
 
+   @Test
+   public void testWildcardFlowControl() throws Exception {
+      final int PRODUCER_COUNT = 3000;
+      final int ITERATIONS = 4;
+      final CountDownLatch consumerLatch = new CountDownLatch(PRODUCER_COUNT * 
ITERATIONS);
+      final CountDownLatch producerLatch = new CountDownLatch(PRODUCER_COUNT * 
ITERATIONS);
+
+      setupServer(0, true, isNetty());
+      setupServer(1, true, isNetty());
+      servers[0].getConfiguration().setWildcardRoutingEnabled(true);
+      servers[1].getConfiguration().setWildcardRoutingEnabled(true);
+
+      setupClusterConnection("cluster0", "queues", 
MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 0, 1);
+      setupClusterConnection("cluster1", "queues", 
MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 1, 0);
+
+      startServers(0, 1);
+
+      waitForTopology(servers[0], 2);
+      waitForTopology(servers[1], 2);
+
+      setupSessionFactory(0, isNetty());
+      setupSessionFactory(1, isNetty());
+
+      logger.info("Creating " + PRODUCER_COUNT + " multicast addresses on node 
1...");
+      for (int i = 0; i < PRODUCER_COUNT; i++) {
+         createAddressInfo(1, "queues." + i, RoutingType.MULTICAST, -1, false);
+      }
+      logger.info("Addresses created.");
+
+      createQueue(0, "queues.#", "queue", null, false, RoutingType.MULTICAST);
+
+      logger.info("Creating consumer on node 0");
+      addConsumer(0, 0, "queue", null);
+
+      consumers[0].getConsumer().setMessageHandler(message -> {
+         logger.debug("Received: " + message);
+         consumerLatch.countDown();
+      });
+
+      waitForBindings(0, "queues.#", 1, 1, true);
+
+      logger.info("Creating " + PRODUCER_COUNT + " producers...");
+      ClientProducer[] producers = new ClientProducer[PRODUCER_COUNT];
+      ClientSession[] sessions = new ClientSession[PRODUCER_COUNT];
+      for (int i = 0; i < PRODUCER_COUNT; i++) {
+         ClientSessionFactory sf = sfs[1];
+         sessions[i] = addClientSession(sf.createSession(true, true, 0));
+         producers[i] = addClientProducer(sessions[i].createProducer("queues." 
+ i));
+      }
+
+      ExecutorService executorService = 
Executors.newFixedThreadPool(PRODUCER_COUNT);
+      runAfter(executorService::shutdownNow);
+      for (int i = 0; i < PRODUCER_COUNT; i++) {
+         final ClientProducer producer = producers[i];
+         final ClientMessage message = sessions[i].createMessage(true);
+         executorService.submit(() -> {
+            for (int j = 0; j < ITERATIONS; j++) {
+               try {
+                  producer.send(message);
+                  producerLatch.countDown();
+                  logger.debug("Sent message");
+               } catch (Exception e) {
+                  logger.error(e.getMessage(), e);
+               }
+            }
+         });
+      }
+      producerLatch.await(30, TimeUnit.SECONDS);
+      logger.info("Waiting for messages on node 0");
+      assertTrue(consumerLatch.await(30, TimeUnit.SECONDS));
+   }
+
    @Test
    public void testSimpleRestartClusterConnection() throws Exception {
       setupServer(0, true, isNetty());

Reply via email to