This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git

commit 45f27895319db445d71276f6a90ac090c35c7da1
Author: Clebert Suconic <[email protected]>
AuthorDate: Mon Oct 9 15:44:53 2023 -0400

    ARTEMIS-4453 Asynchronous flow control broken when more than 1000 
destinations in the producer
---
 .../client/impl/AbstractProducerCreditsImpl.java   |  20 ++--
 .../client/impl/ClientProducerCreditManager.java   |   2 +-
 .../impl/ClientProducerCreditManagerImpl.java      |  89 ++++++++--------
 .../core/client/impl/ClientProducerCredits.java    |   2 +
 .../core/client/impl/ClientSessionImpl.java        |   2 +-
 .../client/ProducerFlowControlTest.java            | 113 +++++++++++++++------
 6 files changed, 141 insertions(+), 87 deletions(-)

diff --git 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/AbstractProducerCreditsImpl.java
 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/AbstractProducerCreditsImpl.java
index a2fe15ffe2..ac83c29b1a 100644
--- 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/AbstractProducerCreditsImpl.java
+++ 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/AbstractProducerCreditsImpl.java
@@ -59,6 +59,11 @@ public abstract class AbstractProducerCreditsImpl implements 
ClientProducerCredi
       this.windowSize = windowSize / 2;
    }
 
+   @Override
+   public int getArriving() {
+      return arriving;
+   }
+
    @Override
    public SimpleString getAddress() {
       return address;
@@ -86,13 +91,12 @@ public abstract class AbstractProducerCreditsImpl 
implements ClientProducerCredi
    }
 
    protected void afterAcquired(int credits) throws 
ActiveMQAddressFullException {
-      if (logger.isDebugEnabled()) {
-         logger.debug("AfterAcquire {} credits on address {}", credits, 
address);
-      }
-
       synchronized (this) {
          pendingCredits -= credits;
       }
+      if (logger.isDebugEnabled()) {
+         logger.debug("AfterAcquire {} credits on address {}, 
pendingCredits={}", credits, address, pendingCredits);
+      }
    }
 
    protected abstract void actualAcquire(int credits);
@@ -149,9 +153,6 @@ public abstract class AbstractProducerCreditsImpl 
implements ClientProducerCredi
       return --refCount;
    }
 
-   @Override
-   public abstract int getBalance();
-
    protected void checkCredits(final int credits) {
       int needed = Math.max(credits, windowSize);
       if (logger.isTraceEnabled()) {
@@ -192,4 +193,9 @@ public abstract class AbstractProducerCreditsImpl 
implements ClientProducerCredi
       }
       session.sendProducerCreditsMessage(credits, address);
    }
+
+   @Override
+   public String toString() {
+      return this.getClass().getName() + "{" + "pendingCredits=" + 
pendingCredits + ", windowSize=" + windowSize + ", closed=" + closed + ", 
blocked=" + blocked + ", address=" + address + ", arriving=" + arriving + ", 
refCount=" + refCount + ", serverRespondedWithFail=" + serverRespondedWithFail 
+ '}';
+   }
 }
diff --git 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerCreditManager.java
 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerCreditManager.java
index bb65f72c7a..3cdf6ed44f 100644
--- 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerCreditManager.java
+++ 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerCreditManager.java
@@ -35,7 +35,7 @@ public interface ClientProducerCreditManager {
 
    int creditsMapSize();
 
-   int unReferencedCreditsSize();
+   int getMaxAnonymousCacheSize();
 
    /** This will determine the flow control as asynchronous,
     *  no actual block should happen instead a callback will be sent whenever 
blockages change  */
diff --git 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerCreditManagerImpl.java
 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerCreditManagerImpl.java
index c71082be5a..fba2928343 100644
--- 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerCreditManagerImpl.java
+++ 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerCreditManagerImpl.java
@@ -16,23 +16,25 @@
  */
 package org.apache.activemq.artemis.core.client.impl;
 
+import java.lang.invoke.MethodHandles;
 import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.Map;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.TimeUnit;
 
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.spi.core.remoting.SessionContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class ClientProducerCreditManagerImpl implements 
ClientProducerCreditManager {
 
-   public static final int MAX_UNREFERENCED_CREDITS_CACHE_SIZE = 1000;
+   private static final Logger logger = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+   public static final int MAX_ANONYMOUS_CREDITS_CACHE_SIZE = 1000;
 
    private final Map<SimpleString, ClientProducerCredits> producerCredits = 
new LinkedHashMap<>();
 
-   private final Map<SimpleString, ClientProducerCredits> unReferencedCredits 
= new LinkedHashMap<>();
+   private final Map<SimpleString, ClientProducerCredits> anonymousCredits = 
new LinkedHashMap<>();
 
    private final ClientSessionInternal session;
 
@@ -40,16 +42,10 @@ public class ClientProducerCreditManagerImpl implements 
ClientProducerCreditMana
 
    private ClientProducerFlowCallback callback;
 
-   private final ScheduledExecutorService scheduledThreadPool;
-
-   private ScheduledFuture future;
-
-   public ClientProducerCreditManagerImpl(final ClientSessionInternal session, 
final int windowSize, ScheduledExecutorService scheduledThreadPool) {
+   public ClientProducerCreditManagerImpl(final ClientSessionInternal session, 
final int windowSize) {
       this.session = session;
 
       this.windowSize = windowSize;
-
-      this.scheduledThreadPool = scheduledThreadPool;
    }
 
 
@@ -81,13 +77,13 @@ public class ClientProducerCreditManagerImpl implements 
ClientProducerCreditMana
                producerCredits.put(address, credits);
             }
 
-            if (!anon) {
+            if (anon) {
+               addToAnonymousList(address, credits);
+            } else {
                credits.incrementRefCount();
 
                // Remove from anon credits (if there)
-               unReferencedCredits.remove(address);
-            } else {
-               addToUnReferencedCache(address, credits);
+               anonymousCredits.remove(address);
             }
          }
 
@@ -115,7 +111,7 @@ public class ClientProducerCreditManagerImpl implements 
ClientProducerCreditMana
       ClientProducerCredits credits = producerCredits.get(address);
 
       if (credits != null && credits.decrementRefCount() == 0) {
-         addToUnReferencedCache(address, credits);
+         addToAnonymousList(address, credits);
       }
    }
 
@@ -154,11 +150,7 @@ public class ClientProducerCreditManagerImpl implements 
ClientProducerCreditMana
 
       producerCredits.clear();
 
-      unReferencedCredits.clear();
-
-      if (future != null) {
-         future.cancel(false);
-      }
+      anonymousCredits.clear();
    }
 
    @Override
@@ -167,34 +159,31 @@ public class ClientProducerCreditManagerImpl implements 
ClientProducerCreditMana
    }
 
    @Override
-   public synchronized int unReferencedCreditsSize() {
-      return unReferencedCredits.size();
+   public synchronized int getMaxAnonymousCacheSize() {
+      return anonymousCredits.size();
    }
 
-   private void addToUnReferencedCache(final SimpleString address, final 
ClientProducerCredits credits) {
-      unReferencedCredits.put(address, credits);
-
-      if (unReferencedCredits.size() > MAX_UNREFERENCED_CREDITS_CACHE_SIZE) {
-         // if we've exceeded our limit then try to clean up
-         if (future == null) {
-            future = scheduledThreadPool.scheduleWithFixedDelay(() -> {
-               synchronized (this) {
-                  Iterator<Map.Entry<SimpleString, ClientProducerCredits>> 
iter = unReferencedCredits.entrySet().iterator();
-                  while (iter.hasNext()) {
-                     Map.Entry<SimpleString, ClientProducerCredits> entry = 
iter.next();
-                     if (entry.getValue().getBalance() == 0) {
-                        iter.remove();
-                        producerCredits.remove(entry.getKey());
-                        entry.getValue().close();
-                     }
-                  }
+   private void addToAnonymousList(final SimpleString address, final 
ClientProducerCredits credits) {
+      anonymousCredits.put(address, credits);
+
+      if (anonymousCredits.size() > MAX_ANONYMOUS_CREDITS_CACHE_SIZE) {
+         logger.trace("Producer list has more than 
MAX_ANONYMOUS_CREDITS_CACHE_SIZE={}. clearing elements from the list", 
anonymousCredits.size());
+         try {
+            Iterator<Map.Entry<SimpleString, ClientProducerCredits>> iter = 
anonymousCredits.entrySet().iterator();
+            while (iter.hasNext() && anonymousCredits.size() > 
MAX_ANONYMOUS_CREDITS_CACHE_SIZE) {
+               Map.Entry<SimpleString, ClientProducerCredits> entry = 
iter.next();
+               if (entry.getValue().getArriving() <= 0) {
+                  logger.trace("Removing credit {}, {}", entry.getKey(), 
entry.getValue());
+                  iter.remove();
+                  producerCredits.remove(entry.getKey());
+                  entry.getValue().close();
+               } else {
+                  logger.trace("Keeping credit for {}, {}", entry.getKey(), 
entry.getValue());
                }
-            }, 0, 30, TimeUnit.SECONDS);
-         }
-      } else {
-         // if we're below our limit make sure we're not trying to clean up
-         if (future != null) {
-            future.cancel(false);
+            }
+         } catch (Throwable e) {
+            // this is not really an expected error. no need for a logger code
+            logger.warn("Error clearing anonymousList - {}", e.getMessage(), 
e);
          }
       }
    }
@@ -203,6 +192,11 @@ public class ClientProducerCreditManagerImpl implements 
ClientProducerCreditMana
 
       static ClientProducerCreditsNoFlowControl instance = new 
ClientProducerCreditsNoFlowControl();
 
+      @Override
+      public int getArriving() {
+         return 0;
+      }
+
       @Override
       public void acquireCredits(int credits) {
       }
@@ -251,5 +245,4 @@ public class ClientProducerCreditManagerImpl implements 
ClientProducerCreditMana
          return 0;
       }
    }
-
 }
diff --git 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerCredits.java
 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerCredits.java
index 78c37fc0d8..203f11a8c4 100644
--- 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerCredits.java
+++ 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerCredits.java
@@ -43,4 +43,6 @@ public interface ClientProducerCredits {
    SimpleString getAddress();
 
    int getBalance();
+
+   int getArriving();
 }
diff --git 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java
 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java
index 2a825b6d6b..9d6aeae416 100644
--- 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java
+++ 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java
@@ -248,7 +248,7 @@ public final class ClientSessionImpl implements 
ClientSessionInternal, FailureLi
 
       this.groupID = groupID;
 
-      producerCreditManager = new ClientProducerCreditManagerImpl(this, 
producerWindowSize, scheduledThreadPool);
+      producerCreditManager = new ClientProducerCreditManagerImpl(this, 
producerWindowSize);
 
       this.sessionContext = sessionContext;
 
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ProducerFlowControlTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ProducerFlowControlTest.java
index 649e5e96bf..0d0c399381 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ProducerFlowControlTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ProducerFlowControlTest.java
@@ -36,6 +36,7 @@ import 
org.apache.activemq.artemis.api.core.client.MessageHandler;
 import org.apache.activemq.artemis.api.core.client.ServerLocator;
 import 
org.apache.activemq.artemis.core.client.impl.ClientProducerCreditManagerImpl;
 import org.apache.activemq.artemis.core.client.impl.ClientProducerCredits;
+import org.apache.activemq.artemis.core.client.impl.ClientProducerFlowCallback;
 import org.apache.activemq.artemis.core.client.impl.ClientProducerInternal;
 import org.apache.activemq.artemis.core.client.impl.ClientSessionInternal;
 import org.apache.activemq.artemis.core.server.ActiveMQServer;
@@ -483,7 +484,7 @@ public class ProducerFlowControlTest extends 
ActiveMQTestBase {
 
       ClientProducerCredits credits = null;
 
-      for (int i = 0; i < 
ClientProducerCreditManagerImpl.MAX_UNREFERENCED_CREDITS_CACHE_SIZE * 2; i++) {
+      for (int i = 0; i < 
ClientProducerCreditManagerImpl.MAX_ANONYMOUS_CREDITS_CACHE_SIZE * 2; i++) {
          ClientProducer prod = session.createProducer("address");
 
          ClientProducerCredits newCredits = ((ClientProducerInternal) 
prod).getProducerCredits();
@@ -495,7 +496,7 @@ public class ProducerFlowControlTest extends 
ActiveMQTestBase {
          credits = newCredits;
 
          Assert.assertEquals(1, ((ClientSessionInternal) 
session).getProducerCreditManager().creditsMapSize());
-         Assert.assertEquals(0, ((ClientSessionInternal) 
session).getProducerCreditManager().unReferencedCreditsSize());
+         Assert.assertEquals(0, ((ClientSessionInternal) 
session).getProducerCreditManager().getMaxAnonymousCacheSize());
       }
    }
 
@@ -513,7 +514,7 @@ public class ProducerFlowControlTest extends 
ActiveMQTestBase {
 
       ClientProducerCredits credits = null;
 
-      for (int i = 0; i < 
ClientProducerCreditManagerImpl.MAX_UNREFERENCED_CREDITS_CACHE_SIZE * 2; i++) {
+      for (int i = 0; i < 
ClientProducerCreditManagerImpl.MAX_ANONYMOUS_CREDITS_CACHE_SIZE * 2; i++) {
          ClientProducer prod = session.createProducer("address");
 
          ClientProducerCredits newCredits = ((ClientProducerInternal) 
prod).getProducerCredits();
@@ -527,7 +528,7 @@ public class ProducerFlowControlTest extends 
ActiveMQTestBase {
          prod.close();
 
          Assert.assertEquals(1, ((ClientSessionInternal) 
session).getProducerCreditManager().creditsMapSize());
-         Assert.assertEquals(1, ((ClientSessionInternal) 
session).getProducerCreditManager().unReferencedCreditsSize());
+         Assert.assertEquals(1, ((ClientSessionInternal) 
session).getProducerCreditManager().getMaxAnonymousCacheSize());
       }
    }
 
@@ -546,7 +547,7 @@ public class ProducerFlowControlTest extends 
ActiveMQTestBase {
 
       ClientProducerCredits credits = null;
 
-      for (int i = 0; i < 
ClientProducerCreditManagerImpl.MAX_UNREFERENCED_CREDITS_CACHE_SIZE; i++) {
+      for (int i = 0; i < 
ClientProducerCreditManagerImpl.MAX_ANONYMOUS_CREDITS_CACHE_SIZE; i++) {
          ClientProducer prod = session.createProducer("address" + i);
 
          ClientProducerCredits newCredits = ((ClientProducerInternal) 
prod).getProducerCredits();
@@ -558,7 +559,7 @@ public class ProducerFlowControlTest extends 
ActiveMQTestBase {
          credits = newCredits;
 
          Assert.assertEquals(i + 1, ((ClientSessionInternal) 
session).getProducerCreditManager().creditsMapSize());
-         Assert.assertEquals(0, ((ClientSessionInternal) 
session).getProducerCreditManager().unReferencedCreditsSize());
+         Assert.assertEquals(0, ((ClientSessionInternal) 
session).getProducerCreditManager().getMaxAnonymousCacheSize());
       }
    }
 
@@ -576,7 +577,7 @@ public class ProducerFlowControlTest extends 
ActiveMQTestBase {
 
       ClientProducerCredits credits = null;
 
-      for (int i = 0; i < 
ClientProducerCreditManagerImpl.MAX_UNREFERENCED_CREDITS_CACHE_SIZE; i++) {
+      for (int i = 0; i < 
ClientProducerCreditManagerImpl.MAX_ANONYMOUS_CREDITS_CACHE_SIZE; i++) {
          ClientProducer prod = session.createProducer("address" + i);
 
          ClientProducerCredits newCredits = ((ClientProducerInternal) 
prod).getProducerCredits();
@@ -590,7 +591,7 @@ public class ProducerFlowControlTest extends 
ActiveMQTestBase {
          prod.close();
 
          Assert.assertEquals(i + 1, ((ClientSessionInternal) 
session).getProducerCreditManager().creditsMapSize());
-         Assert.assertEquals(i + 1, ((ClientSessionInternal) 
session).getProducerCreditManager().unReferencedCreditsSize());
+         Assert.assertEquals(i + 1, ((ClientSessionInternal) 
session).getProducerCreditManager().getMaxAnonymousCacheSize());
       }
    }
 
@@ -611,7 +612,7 @@ public class ProducerFlowControlTest extends 
ActiveMQTestBase {
 
       List<ClientProducerCredits> creditsList = new ArrayList<>();
 
-      for (int i = 0; i < 
ClientProducerCreditManagerImpl.MAX_UNREFERENCED_CREDITS_CACHE_SIZE; i++) {
+      for (int i = 0; i < 
ClientProducerCreditManagerImpl.MAX_ANONYMOUS_CREDITS_CACHE_SIZE; i++) {
          ClientProducer prod = session.createProducer("address" + i);
 
          ClientProducerCredits newCredits = ((ClientProducerInternal) 
prod).getProducerCredits();
@@ -623,29 +624,29 @@ public class ProducerFlowControlTest extends 
ActiveMQTestBase {
          credits = newCredits;
 
          Assert.assertEquals(i + 1, ((ClientSessionInternal) 
session).getProducerCreditManager().creditsMapSize());
-         Assert.assertEquals(0, ((ClientSessionInternal) 
session).getProducerCreditManager().unReferencedCreditsSize());
+         Assert.assertEquals(0, ((ClientSessionInternal) 
session).getProducerCreditManager().getMaxAnonymousCacheSize());
 
          creditsList.add(credits);
       }
 
       Iterator<ClientProducerCredits> iter = creditsList.iterator();
 
-      for (int i = 0; i < 
ClientProducerCreditManagerImpl.MAX_UNREFERENCED_CREDITS_CACHE_SIZE; i++) {
+      for (int i = 0; i < 
ClientProducerCreditManagerImpl.MAX_ANONYMOUS_CREDITS_CACHE_SIZE; i++) {
          ClientProducer prod = session.createProducer("address" + i);
 
          ClientProducerCredits newCredits = ((ClientProducerInternal) 
prod).getProducerCredits();
 
          Assert.assertTrue(newCredits == iter.next());
 
-         
Assert.assertEquals(ClientProducerCreditManagerImpl.MAX_UNREFERENCED_CREDITS_CACHE_SIZE,
 ((ClientSessionInternal) session).getProducerCreditManager().creditsMapSize());
-         Assert.assertEquals(0, ((ClientSessionInternal) 
session).getProducerCreditManager().unReferencedCreditsSize());
+         
Assert.assertEquals(ClientProducerCreditManagerImpl.MAX_ANONYMOUS_CREDITS_CACHE_SIZE,
 ((ClientSessionInternal) session).getProducerCreditManager().creditsMapSize());
+         Assert.assertEquals(0, ((ClientSessionInternal) 
session).getProducerCreditManager().getMaxAnonymousCacheSize());
       }
 
       for (int i = 0; i < 10; i++) {
-         session.createProducer("address" + (i + 
ClientProducerCreditManagerImpl.MAX_UNREFERENCED_CREDITS_CACHE_SIZE));
+         session.createProducer("address" + (i + 
ClientProducerCreditManagerImpl.MAX_ANONYMOUS_CREDITS_CACHE_SIZE));
 
-         
Assert.assertEquals(ClientProducerCreditManagerImpl.MAX_UNREFERENCED_CREDITS_CACHE_SIZE
 + i + 1, ((ClientSessionInternal) 
session).getProducerCreditManager().creditsMapSize());
-         Assert.assertEquals(0, ((ClientSessionInternal) 
session).getProducerCreditManager().unReferencedCreditsSize());
+         
Assert.assertEquals(ClientProducerCreditManagerImpl.MAX_ANONYMOUS_CREDITS_CACHE_SIZE
 + i + 1, ((ClientSessionInternal) 
session).getProducerCreditManager().creditsMapSize());
+         Assert.assertEquals(0, ((ClientSessionInternal) 
session).getProducerCreditManager().getMaxAnonymousCacheSize());
       }
    }
 
@@ -661,13 +662,13 @@ public class ProducerFlowControlTest extends 
ActiveMQTestBase {
 
       session.createQueue(new 
QueueConfiguration("queue1").setAddress("address").setDurable(false));
 
-      for (int i = 0; i < 
ClientProducerCreditManagerImpl.MAX_UNREFERENCED_CREDITS_CACHE_SIZE; i++) {
+      for (int i = 0; i < 
ClientProducerCreditManagerImpl.MAX_ANONYMOUS_CREDITS_CACHE_SIZE; i++) {
          ClientProducer prod = session.createProducer((String) null);
 
          prod.send("address", session.createMessage(false));
 
          Assert.assertEquals(1, ((ClientSessionInternal) 
session).getProducerCreditManager().creditsMapSize());
-         Assert.assertEquals(1, ((ClientSessionInternal) 
session).getProducerCreditManager().unReferencedCreditsSize());
+         Assert.assertEquals(1, ((ClientSessionInternal) 
session).getProducerCreditManager().getMaxAnonymousCacheSize());
       }
    }
 
@@ -684,13 +685,13 @@ public class ProducerFlowControlTest extends 
ActiveMQTestBase {
 
       session.createQueue(new 
QueueConfiguration("queue1").setAddress("address").setDurable(false));
 
-      for (int i = 0; i < 
ClientProducerCreditManagerImpl.MAX_UNREFERENCED_CREDITS_CACHE_SIZE; i++) {
+      for (int i = 0; i < 
ClientProducerCreditManagerImpl.MAX_ANONYMOUS_CREDITS_CACHE_SIZE; i++) {
          ClientProducer prod = session.createProducer((String) null);
 
          prod.send("address" + i, session.createMessage(false));
 
          Assert.assertEquals(i + 1, ((ClientSessionInternal) 
session).getProducerCreditManager().creditsMapSize());
-         Assert.assertEquals(i + 1, ((ClientSessionInternal) 
session).getProducerCreditManager().unReferencedCreditsSize());
+         Assert.assertEquals(i + 1, ((ClientSessionInternal) 
session).getProducerCreditManager().getMaxAnonymousCacheSize());
       }
 
       for (int i = 0; i < 10; i++) {
@@ -698,8 +699,8 @@ public class ProducerFlowControlTest extends 
ActiveMQTestBase {
 
          prod.send("address" + i, session.createMessage(false));
 
-         
Assert.assertEquals(ClientProducerCreditManagerImpl.MAX_UNREFERENCED_CREDITS_CACHE_SIZE,
 ((ClientSessionInternal) session).getProducerCreditManager().creditsMapSize());
-         
Assert.assertEquals(ClientProducerCreditManagerImpl.MAX_UNREFERENCED_CREDITS_CACHE_SIZE,
 ((ClientSessionInternal) 
session).getProducerCreditManager().unReferencedCreditsSize());
+         
Assert.assertEquals(ClientProducerCreditManagerImpl.MAX_ANONYMOUS_CREDITS_CACHE_SIZE,
 ((ClientSessionInternal) session).getProducerCreditManager().creditsMapSize());
+         
Assert.assertEquals(ClientProducerCreditManagerImpl.MAX_ANONYMOUS_CREDITS_CACHE_SIZE,
 ((ClientSessionInternal) 
session).getProducerCreditManager().getMaxAnonymousCacheSize());
       }
 
       for (int i = 0; i < 10; i++) {
@@ -707,11 +708,63 @@ public class ProducerFlowControlTest extends 
ActiveMQTestBase {
 
          prod.send("address2-" + i, session.createMessage(false));
 
-         
Assert.assertEquals(ClientProducerCreditManagerImpl.MAX_UNREFERENCED_CREDITS_CACHE_SIZE,
 ((ClientSessionInternal) session).getProducerCreditManager().creditsMapSize());
-         
Assert.assertEquals(ClientProducerCreditManagerImpl.MAX_UNREFERENCED_CREDITS_CACHE_SIZE,
 ((ClientSessionInternal) 
session).getProducerCreditManager().unReferencedCreditsSize());
+         
Assert.assertEquals(ClientProducerCreditManagerImpl.MAX_ANONYMOUS_CREDITS_CACHE_SIZE,
 ((ClientSessionInternal) session).getProducerCreditManager().creditsMapSize());
+         
Assert.assertEquals(ClientProducerCreditManagerImpl.MAX_ANONYMOUS_CREDITS_CACHE_SIZE,
 ((ClientSessionInternal) 
session).getProducerCreditManager().getMaxAnonymousCacheSize());
       }
    }
 
+
+   @Test
+   public void testProducerCreditsCachingAsync() throws Exception {
+
+      class FlowCallback implements ClientProducerFlowCallback {
+         boolean blocked = false;
+
+         @Override
+         public void onCreditsFlow(boolean blocked, ClientProducerCredits 
producerCredits) {
+            if (blocked) {
+               this.blocked = true;
+            }
+         }
+
+         @Override
+         public void onCreditsFail(ClientProducerCredits credits) {
+
+         }
+      }
+
+      FlowCallback flowCallback = new FlowCallback();
+
+
+      server = createServer(false, isNetty());
+
+      server.start();
+      waitForServerToStart(server);
+
+      locator.setConfirmationWindowSize(10 * 1024);
+      sf = createSessionFactory(locator);
+
+      session = sf.createSession(false, true, true, true);
+
+      session.createQueue(new 
QueueConfiguration("queue1").setAddress("address").setDurable(false));
+
+      
((ClientSessionInternal)session).getProducerCreditManager().setCallback(flowCallback);
+
+      for (int i = 0; i < 2 * 
ClientProducerCreditManagerImpl.MAX_ANONYMOUS_CREDITS_CACHE_SIZE; i++) {
+         ClientProducer prod = session.createProducer((String) null);
+
+         prod.send("address" + i, session.createMessage(false));
+
+         Assert.assertTrue(((ClientSessionInternal) 
session).getProducerCreditManager().creditsMapSize() <= 
ClientProducerCreditManagerImpl.MAX_ANONYMOUS_CREDITS_CACHE_SIZE);
+         Assert.assertTrue(((ClientSessionInternal) 
session).getProducerCreditManager().getMaxAnonymousCacheSize() <= 
ClientProducerCreditManagerImpl.MAX_ANONYMOUS_CREDITS_CACHE_SIZE);
+      }
+
+      session.close();
+      sf.createSession();
+
+      Assert.assertFalse(flowCallback.blocked);
+   }
+
    @Test
    public void testProducerCreditsRefCounting() throws Exception {
       server = createServer(false, isNetty());
@@ -727,30 +780,30 @@ public class ProducerFlowControlTest extends 
ActiveMQTestBase {
 
       ClientProducer prod1 = session.createProducer("address");
       Assert.assertEquals(1, ((ClientSessionInternal) 
session).getProducerCreditManager().creditsMapSize());
-      Assert.assertEquals(0, ((ClientSessionInternal) 
session).getProducerCreditManager().unReferencedCreditsSize());
+      Assert.assertEquals(0, ((ClientSessionInternal) 
session).getProducerCreditManager().getMaxAnonymousCacheSize());
 
       ClientProducer prod2 = session.createProducer("address");
       Assert.assertEquals(1, ((ClientSessionInternal) 
session).getProducerCreditManager().creditsMapSize());
-      Assert.assertEquals(0, ((ClientSessionInternal) 
session).getProducerCreditManager().unReferencedCreditsSize());
+      Assert.assertEquals(0, ((ClientSessionInternal) 
session).getProducerCreditManager().getMaxAnonymousCacheSize());
 
       ClientProducer prod3 = session.createProducer("address");
       Assert.assertEquals(1, ((ClientSessionInternal) 
session).getProducerCreditManager().creditsMapSize());
-      Assert.assertEquals(0, ((ClientSessionInternal) 
session).getProducerCreditManager().unReferencedCreditsSize());
+      Assert.assertEquals(0, ((ClientSessionInternal) 
session).getProducerCreditManager().getMaxAnonymousCacheSize());
 
       prod1.close();
 
       Assert.assertEquals(1, ((ClientSessionInternal) 
session).getProducerCreditManager().creditsMapSize());
-      Assert.assertEquals(0, ((ClientSessionInternal) 
session).getProducerCreditManager().unReferencedCreditsSize());
+      Assert.assertEquals(0, ((ClientSessionInternal) 
session).getProducerCreditManager().getMaxAnonymousCacheSize());
 
       prod2.close();
 
       Assert.assertEquals(1, ((ClientSessionInternal) 
session).getProducerCreditManager().creditsMapSize());
-      Assert.assertEquals(0, ((ClientSessionInternal) 
session).getProducerCreditManager().unReferencedCreditsSize());
+      Assert.assertEquals(0, ((ClientSessionInternal) 
session).getProducerCreditManager().getMaxAnonymousCacheSize());
 
       prod3.close();
 
       Assert.assertEquals(1, ((ClientSessionInternal) 
session).getProducerCreditManager().creditsMapSize());
-      Assert.assertEquals(1, ((ClientSessionInternal) 
session).getProducerCreditManager().unReferencedCreditsSize());
+      Assert.assertEquals(1, ((ClientSessionInternal) 
session).getProducerCreditManager().getMaxAnonymousCacheSize());
    }
 
 }

Reply via email to