This is an automated email from the ASF dual-hosted git repository. clebertsuconic pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
commit 45f27895319db445d71276f6a90ac090c35c7da1 Author: Clebert Suconic <[email protected]> AuthorDate: Mon Oct 9 15:44:53 2023 -0400 ARTEMIS-4453 Asynchronous flow control broken when more than 1000 destinations in the producer --- .../client/impl/AbstractProducerCreditsImpl.java | 20 ++-- .../client/impl/ClientProducerCreditManager.java | 2 +- .../impl/ClientProducerCreditManagerImpl.java | 89 ++++++++-------- .../core/client/impl/ClientProducerCredits.java | 2 + .../core/client/impl/ClientSessionImpl.java | 2 +- .../client/ProducerFlowControlTest.java | 113 +++++++++++++++------ 6 files changed, 141 insertions(+), 87 deletions(-) diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/AbstractProducerCreditsImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/AbstractProducerCreditsImpl.java index a2fe15ffe2..ac83c29b1a 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/AbstractProducerCreditsImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/AbstractProducerCreditsImpl.java @@ -59,6 +59,11 @@ public abstract class AbstractProducerCreditsImpl implements ClientProducerCredi this.windowSize = windowSize / 2; } + @Override + public int getArriving() { + return arriving; + } + @Override public SimpleString getAddress() { return address; @@ -86,13 +91,12 @@ public abstract class AbstractProducerCreditsImpl implements ClientProducerCredi } protected void afterAcquired(int credits) throws ActiveMQAddressFullException { - if (logger.isDebugEnabled()) { - logger.debug("AfterAcquire {} credits on address {}", credits, address); - } - synchronized (this) { pendingCredits -= credits; } + if (logger.isDebugEnabled()) { + logger.debug("AfterAcquire {} credits on address {}, pendingCredits={}", credits, address, pendingCredits); + } } protected abstract void actualAcquire(int credits); @@ -149,9 +153,6 @@ public abstract class AbstractProducerCreditsImpl implements ClientProducerCredi return --refCount; } - @Override - public abstract int getBalance(); - protected void checkCredits(final int credits) { int needed = Math.max(credits, windowSize); if (logger.isTraceEnabled()) { @@ -192,4 +193,9 @@ public abstract class AbstractProducerCreditsImpl implements ClientProducerCredi } session.sendProducerCreditsMessage(credits, address); } + + @Override + public String toString() { + return this.getClass().getName() + "{" + "pendingCredits=" + pendingCredits + ", windowSize=" + windowSize + ", closed=" + closed + ", blocked=" + blocked + ", address=" + address + ", arriving=" + arriving + ", refCount=" + refCount + ", serverRespondedWithFail=" + serverRespondedWithFail + '}'; + } } diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerCreditManager.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerCreditManager.java index bb65f72c7a..3cdf6ed44f 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerCreditManager.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerCreditManager.java @@ -35,7 +35,7 @@ public interface ClientProducerCreditManager { int creditsMapSize(); - int unReferencedCreditsSize(); + int getMaxAnonymousCacheSize(); /** This will determine the flow control as asynchronous, * no actual block should happen instead a callback will be sent whenever blockages change */ diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerCreditManagerImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerCreditManagerImpl.java index c71082be5a..fba2928343 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerCreditManagerImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerCreditManagerImpl.java @@ -16,23 +16,25 @@ */ package org.apache.activemq.artemis.core.client.impl; +import java.lang.invoke.MethodHandles; import java.util.Iterator; import java.util.LinkedHashMap; import java.util.Map; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.TimeUnit; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.spi.core.remoting.SessionContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class ClientProducerCreditManagerImpl implements ClientProducerCreditManager { - public static final int MAX_UNREFERENCED_CREDITS_CACHE_SIZE = 1000; + private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + + public static final int MAX_ANONYMOUS_CREDITS_CACHE_SIZE = 1000; private final Map<SimpleString, ClientProducerCredits> producerCredits = new LinkedHashMap<>(); - private final Map<SimpleString, ClientProducerCredits> unReferencedCredits = new LinkedHashMap<>(); + private final Map<SimpleString, ClientProducerCredits> anonymousCredits = new LinkedHashMap<>(); private final ClientSessionInternal session; @@ -40,16 +42,10 @@ public class ClientProducerCreditManagerImpl implements ClientProducerCreditMana private ClientProducerFlowCallback callback; - private final ScheduledExecutorService scheduledThreadPool; - - private ScheduledFuture future; - - public ClientProducerCreditManagerImpl(final ClientSessionInternal session, final int windowSize, ScheduledExecutorService scheduledThreadPool) { + public ClientProducerCreditManagerImpl(final ClientSessionInternal session, final int windowSize) { this.session = session; this.windowSize = windowSize; - - this.scheduledThreadPool = scheduledThreadPool; } @@ -81,13 +77,13 @@ public class ClientProducerCreditManagerImpl implements ClientProducerCreditMana producerCredits.put(address, credits); } - if (!anon) { + if (anon) { + addToAnonymousList(address, credits); + } else { credits.incrementRefCount(); // Remove from anon credits (if there) - unReferencedCredits.remove(address); - } else { - addToUnReferencedCache(address, credits); + anonymousCredits.remove(address); } } @@ -115,7 +111,7 @@ public class ClientProducerCreditManagerImpl implements ClientProducerCreditMana ClientProducerCredits credits = producerCredits.get(address); if (credits != null && credits.decrementRefCount() == 0) { - addToUnReferencedCache(address, credits); + addToAnonymousList(address, credits); } } @@ -154,11 +150,7 @@ public class ClientProducerCreditManagerImpl implements ClientProducerCreditMana producerCredits.clear(); - unReferencedCredits.clear(); - - if (future != null) { - future.cancel(false); - } + anonymousCredits.clear(); } @Override @@ -167,34 +159,31 @@ public class ClientProducerCreditManagerImpl implements ClientProducerCreditMana } @Override - public synchronized int unReferencedCreditsSize() { - return unReferencedCredits.size(); + public synchronized int getMaxAnonymousCacheSize() { + return anonymousCredits.size(); } - private void addToUnReferencedCache(final SimpleString address, final ClientProducerCredits credits) { - unReferencedCredits.put(address, credits); - - if (unReferencedCredits.size() > MAX_UNREFERENCED_CREDITS_CACHE_SIZE) { - // if we've exceeded our limit then try to clean up - if (future == null) { - future = scheduledThreadPool.scheduleWithFixedDelay(() -> { - synchronized (this) { - Iterator<Map.Entry<SimpleString, ClientProducerCredits>> iter = unReferencedCredits.entrySet().iterator(); - while (iter.hasNext()) { - Map.Entry<SimpleString, ClientProducerCredits> entry = iter.next(); - if (entry.getValue().getBalance() == 0) { - iter.remove(); - producerCredits.remove(entry.getKey()); - entry.getValue().close(); - } - } + private void addToAnonymousList(final SimpleString address, final ClientProducerCredits credits) { + anonymousCredits.put(address, credits); + + if (anonymousCredits.size() > MAX_ANONYMOUS_CREDITS_CACHE_SIZE) { + logger.trace("Producer list has more than MAX_ANONYMOUS_CREDITS_CACHE_SIZE={}. clearing elements from the list", anonymousCredits.size()); + try { + Iterator<Map.Entry<SimpleString, ClientProducerCredits>> iter = anonymousCredits.entrySet().iterator(); + while (iter.hasNext() && anonymousCredits.size() > MAX_ANONYMOUS_CREDITS_CACHE_SIZE) { + Map.Entry<SimpleString, ClientProducerCredits> entry = iter.next(); + if (entry.getValue().getArriving() <= 0) { + logger.trace("Removing credit {}, {}", entry.getKey(), entry.getValue()); + iter.remove(); + producerCredits.remove(entry.getKey()); + entry.getValue().close(); + } else { + logger.trace("Keeping credit for {}, {}", entry.getKey(), entry.getValue()); } - }, 0, 30, TimeUnit.SECONDS); - } - } else { - // if we're below our limit make sure we're not trying to clean up - if (future != null) { - future.cancel(false); + } + } catch (Throwable e) { + // this is not really an expected error. no need for a logger code + logger.warn("Error clearing anonymousList - {}", e.getMessage(), e); } } } @@ -203,6 +192,11 @@ public class ClientProducerCreditManagerImpl implements ClientProducerCreditMana static ClientProducerCreditsNoFlowControl instance = new ClientProducerCreditsNoFlowControl(); + @Override + public int getArriving() { + return 0; + } + @Override public void acquireCredits(int credits) { } @@ -251,5 +245,4 @@ public class ClientProducerCreditManagerImpl implements ClientProducerCreditMana return 0; } } - } diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerCredits.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerCredits.java index 78c37fc0d8..203f11a8c4 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerCredits.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerCredits.java @@ -43,4 +43,6 @@ public interface ClientProducerCredits { SimpleString getAddress(); int getBalance(); + + int getArriving(); } diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java index 2a825b6d6b..9d6aeae416 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java @@ -248,7 +248,7 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi this.groupID = groupID; - producerCreditManager = new ClientProducerCreditManagerImpl(this, producerWindowSize, scheduledThreadPool); + producerCreditManager = new ClientProducerCreditManagerImpl(this, producerWindowSize); this.sessionContext = sessionContext; diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ProducerFlowControlTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ProducerFlowControlTest.java index 649e5e96bf..0d0c399381 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ProducerFlowControlTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ProducerFlowControlTest.java @@ -36,6 +36,7 @@ import org.apache.activemq.artemis.api.core.client.MessageHandler; import org.apache.activemq.artemis.api.core.client.ServerLocator; import org.apache.activemq.artemis.core.client.impl.ClientProducerCreditManagerImpl; import org.apache.activemq.artemis.core.client.impl.ClientProducerCredits; +import org.apache.activemq.artemis.core.client.impl.ClientProducerFlowCallback; import org.apache.activemq.artemis.core.client.impl.ClientProducerInternal; import org.apache.activemq.artemis.core.client.impl.ClientSessionInternal; import org.apache.activemq.artemis.core.server.ActiveMQServer; @@ -483,7 +484,7 @@ public class ProducerFlowControlTest extends ActiveMQTestBase { ClientProducerCredits credits = null; - for (int i = 0; i < ClientProducerCreditManagerImpl.MAX_UNREFERENCED_CREDITS_CACHE_SIZE * 2; i++) { + for (int i = 0; i < ClientProducerCreditManagerImpl.MAX_ANONYMOUS_CREDITS_CACHE_SIZE * 2; i++) { ClientProducer prod = session.createProducer("address"); ClientProducerCredits newCredits = ((ClientProducerInternal) prod).getProducerCredits(); @@ -495,7 +496,7 @@ public class ProducerFlowControlTest extends ActiveMQTestBase { credits = newCredits; Assert.assertEquals(1, ((ClientSessionInternal) session).getProducerCreditManager().creditsMapSize()); - Assert.assertEquals(0, ((ClientSessionInternal) session).getProducerCreditManager().unReferencedCreditsSize()); + Assert.assertEquals(0, ((ClientSessionInternal) session).getProducerCreditManager().getMaxAnonymousCacheSize()); } } @@ -513,7 +514,7 @@ public class ProducerFlowControlTest extends ActiveMQTestBase { ClientProducerCredits credits = null; - for (int i = 0; i < ClientProducerCreditManagerImpl.MAX_UNREFERENCED_CREDITS_CACHE_SIZE * 2; i++) { + for (int i = 0; i < ClientProducerCreditManagerImpl.MAX_ANONYMOUS_CREDITS_CACHE_SIZE * 2; i++) { ClientProducer prod = session.createProducer("address"); ClientProducerCredits newCredits = ((ClientProducerInternal) prod).getProducerCredits(); @@ -527,7 +528,7 @@ public class ProducerFlowControlTest extends ActiveMQTestBase { prod.close(); Assert.assertEquals(1, ((ClientSessionInternal) session).getProducerCreditManager().creditsMapSize()); - Assert.assertEquals(1, ((ClientSessionInternal) session).getProducerCreditManager().unReferencedCreditsSize()); + Assert.assertEquals(1, ((ClientSessionInternal) session).getProducerCreditManager().getMaxAnonymousCacheSize()); } } @@ -546,7 +547,7 @@ public class ProducerFlowControlTest extends ActiveMQTestBase { ClientProducerCredits credits = null; - for (int i = 0; i < ClientProducerCreditManagerImpl.MAX_UNREFERENCED_CREDITS_CACHE_SIZE; i++) { + for (int i = 0; i < ClientProducerCreditManagerImpl.MAX_ANONYMOUS_CREDITS_CACHE_SIZE; i++) { ClientProducer prod = session.createProducer("address" + i); ClientProducerCredits newCredits = ((ClientProducerInternal) prod).getProducerCredits(); @@ -558,7 +559,7 @@ public class ProducerFlowControlTest extends ActiveMQTestBase { credits = newCredits; Assert.assertEquals(i + 1, ((ClientSessionInternal) session).getProducerCreditManager().creditsMapSize()); - Assert.assertEquals(0, ((ClientSessionInternal) session).getProducerCreditManager().unReferencedCreditsSize()); + Assert.assertEquals(0, ((ClientSessionInternal) session).getProducerCreditManager().getMaxAnonymousCacheSize()); } } @@ -576,7 +577,7 @@ public class ProducerFlowControlTest extends ActiveMQTestBase { ClientProducerCredits credits = null; - for (int i = 0; i < ClientProducerCreditManagerImpl.MAX_UNREFERENCED_CREDITS_CACHE_SIZE; i++) { + for (int i = 0; i < ClientProducerCreditManagerImpl.MAX_ANONYMOUS_CREDITS_CACHE_SIZE; i++) { ClientProducer prod = session.createProducer("address" + i); ClientProducerCredits newCredits = ((ClientProducerInternal) prod).getProducerCredits(); @@ -590,7 +591,7 @@ public class ProducerFlowControlTest extends ActiveMQTestBase { prod.close(); Assert.assertEquals(i + 1, ((ClientSessionInternal) session).getProducerCreditManager().creditsMapSize()); - Assert.assertEquals(i + 1, ((ClientSessionInternal) session).getProducerCreditManager().unReferencedCreditsSize()); + Assert.assertEquals(i + 1, ((ClientSessionInternal) session).getProducerCreditManager().getMaxAnonymousCacheSize()); } } @@ -611,7 +612,7 @@ public class ProducerFlowControlTest extends ActiveMQTestBase { List<ClientProducerCredits> creditsList = new ArrayList<>(); - for (int i = 0; i < ClientProducerCreditManagerImpl.MAX_UNREFERENCED_CREDITS_CACHE_SIZE; i++) { + for (int i = 0; i < ClientProducerCreditManagerImpl.MAX_ANONYMOUS_CREDITS_CACHE_SIZE; i++) { ClientProducer prod = session.createProducer("address" + i); ClientProducerCredits newCredits = ((ClientProducerInternal) prod).getProducerCredits(); @@ -623,29 +624,29 @@ public class ProducerFlowControlTest extends ActiveMQTestBase { credits = newCredits; Assert.assertEquals(i + 1, ((ClientSessionInternal) session).getProducerCreditManager().creditsMapSize()); - Assert.assertEquals(0, ((ClientSessionInternal) session).getProducerCreditManager().unReferencedCreditsSize()); + Assert.assertEquals(0, ((ClientSessionInternal) session).getProducerCreditManager().getMaxAnonymousCacheSize()); creditsList.add(credits); } Iterator<ClientProducerCredits> iter = creditsList.iterator(); - for (int i = 0; i < ClientProducerCreditManagerImpl.MAX_UNREFERENCED_CREDITS_CACHE_SIZE; i++) { + for (int i = 0; i < ClientProducerCreditManagerImpl.MAX_ANONYMOUS_CREDITS_CACHE_SIZE; i++) { ClientProducer prod = session.createProducer("address" + i); ClientProducerCredits newCredits = ((ClientProducerInternal) prod).getProducerCredits(); Assert.assertTrue(newCredits == iter.next()); - Assert.assertEquals(ClientProducerCreditManagerImpl.MAX_UNREFERENCED_CREDITS_CACHE_SIZE, ((ClientSessionInternal) session).getProducerCreditManager().creditsMapSize()); - Assert.assertEquals(0, ((ClientSessionInternal) session).getProducerCreditManager().unReferencedCreditsSize()); + Assert.assertEquals(ClientProducerCreditManagerImpl.MAX_ANONYMOUS_CREDITS_CACHE_SIZE, ((ClientSessionInternal) session).getProducerCreditManager().creditsMapSize()); + Assert.assertEquals(0, ((ClientSessionInternal) session).getProducerCreditManager().getMaxAnonymousCacheSize()); } for (int i = 0; i < 10; i++) { - session.createProducer("address" + (i + ClientProducerCreditManagerImpl.MAX_UNREFERENCED_CREDITS_CACHE_SIZE)); + session.createProducer("address" + (i + ClientProducerCreditManagerImpl.MAX_ANONYMOUS_CREDITS_CACHE_SIZE)); - Assert.assertEquals(ClientProducerCreditManagerImpl.MAX_UNREFERENCED_CREDITS_CACHE_SIZE + i + 1, ((ClientSessionInternal) session).getProducerCreditManager().creditsMapSize()); - Assert.assertEquals(0, ((ClientSessionInternal) session).getProducerCreditManager().unReferencedCreditsSize()); + Assert.assertEquals(ClientProducerCreditManagerImpl.MAX_ANONYMOUS_CREDITS_CACHE_SIZE + i + 1, ((ClientSessionInternal) session).getProducerCreditManager().creditsMapSize()); + Assert.assertEquals(0, ((ClientSessionInternal) session).getProducerCreditManager().getMaxAnonymousCacheSize()); } } @@ -661,13 +662,13 @@ public class ProducerFlowControlTest extends ActiveMQTestBase { session.createQueue(new QueueConfiguration("queue1").setAddress("address").setDurable(false)); - for (int i = 0; i < ClientProducerCreditManagerImpl.MAX_UNREFERENCED_CREDITS_CACHE_SIZE; i++) { + for (int i = 0; i < ClientProducerCreditManagerImpl.MAX_ANONYMOUS_CREDITS_CACHE_SIZE; i++) { ClientProducer prod = session.createProducer((String) null); prod.send("address", session.createMessage(false)); Assert.assertEquals(1, ((ClientSessionInternal) session).getProducerCreditManager().creditsMapSize()); - Assert.assertEquals(1, ((ClientSessionInternal) session).getProducerCreditManager().unReferencedCreditsSize()); + Assert.assertEquals(1, ((ClientSessionInternal) session).getProducerCreditManager().getMaxAnonymousCacheSize()); } } @@ -684,13 +685,13 @@ public class ProducerFlowControlTest extends ActiveMQTestBase { session.createQueue(new QueueConfiguration("queue1").setAddress("address").setDurable(false)); - for (int i = 0; i < ClientProducerCreditManagerImpl.MAX_UNREFERENCED_CREDITS_CACHE_SIZE; i++) { + for (int i = 0; i < ClientProducerCreditManagerImpl.MAX_ANONYMOUS_CREDITS_CACHE_SIZE; i++) { ClientProducer prod = session.createProducer((String) null); prod.send("address" + i, session.createMessage(false)); Assert.assertEquals(i + 1, ((ClientSessionInternal) session).getProducerCreditManager().creditsMapSize()); - Assert.assertEquals(i + 1, ((ClientSessionInternal) session).getProducerCreditManager().unReferencedCreditsSize()); + Assert.assertEquals(i + 1, ((ClientSessionInternal) session).getProducerCreditManager().getMaxAnonymousCacheSize()); } for (int i = 0; i < 10; i++) { @@ -698,8 +699,8 @@ public class ProducerFlowControlTest extends ActiveMQTestBase { prod.send("address" + i, session.createMessage(false)); - Assert.assertEquals(ClientProducerCreditManagerImpl.MAX_UNREFERENCED_CREDITS_CACHE_SIZE, ((ClientSessionInternal) session).getProducerCreditManager().creditsMapSize()); - Assert.assertEquals(ClientProducerCreditManagerImpl.MAX_UNREFERENCED_CREDITS_CACHE_SIZE, ((ClientSessionInternal) session).getProducerCreditManager().unReferencedCreditsSize()); + Assert.assertEquals(ClientProducerCreditManagerImpl.MAX_ANONYMOUS_CREDITS_CACHE_SIZE, ((ClientSessionInternal) session).getProducerCreditManager().creditsMapSize()); + Assert.assertEquals(ClientProducerCreditManagerImpl.MAX_ANONYMOUS_CREDITS_CACHE_SIZE, ((ClientSessionInternal) session).getProducerCreditManager().getMaxAnonymousCacheSize()); } for (int i = 0; i < 10; i++) { @@ -707,11 +708,63 @@ public class ProducerFlowControlTest extends ActiveMQTestBase { prod.send("address2-" + i, session.createMessage(false)); - Assert.assertEquals(ClientProducerCreditManagerImpl.MAX_UNREFERENCED_CREDITS_CACHE_SIZE, ((ClientSessionInternal) session).getProducerCreditManager().creditsMapSize()); - Assert.assertEquals(ClientProducerCreditManagerImpl.MAX_UNREFERENCED_CREDITS_CACHE_SIZE, ((ClientSessionInternal) session).getProducerCreditManager().unReferencedCreditsSize()); + Assert.assertEquals(ClientProducerCreditManagerImpl.MAX_ANONYMOUS_CREDITS_CACHE_SIZE, ((ClientSessionInternal) session).getProducerCreditManager().creditsMapSize()); + Assert.assertEquals(ClientProducerCreditManagerImpl.MAX_ANONYMOUS_CREDITS_CACHE_SIZE, ((ClientSessionInternal) session).getProducerCreditManager().getMaxAnonymousCacheSize()); } } + + @Test + public void testProducerCreditsCachingAsync() throws Exception { + + class FlowCallback implements ClientProducerFlowCallback { + boolean blocked = false; + + @Override + public void onCreditsFlow(boolean blocked, ClientProducerCredits producerCredits) { + if (blocked) { + this.blocked = true; + } + } + + @Override + public void onCreditsFail(ClientProducerCredits credits) { + + } + } + + FlowCallback flowCallback = new FlowCallback(); + + + server = createServer(false, isNetty()); + + server.start(); + waitForServerToStart(server); + + locator.setConfirmationWindowSize(10 * 1024); + sf = createSessionFactory(locator); + + session = sf.createSession(false, true, true, true); + + session.createQueue(new QueueConfiguration("queue1").setAddress("address").setDurable(false)); + + ((ClientSessionInternal)session).getProducerCreditManager().setCallback(flowCallback); + + for (int i = 0; i < 2 * ClientProducerCreditManagerImpl.MAX_ANONYMOUS_CREDITS_CACHE_SIZE; i++) { + ClientProducer prod = session.createProducer((String) null); + + prod.send("address" + i, session.createMessage(false)); + + Assert.assertTrue(((ClientSessionInternal) session).getProducerCreditManager().creditsMapSize() <= ClientProducerCreditManagerImpl.MAX_ANONYMOUS_CREDITS_CACHE_SIZE); + Assert.assertTrue(((ClientSessionInternal) session).getProducerCreditManager().getMaxAnonymousCacheSize() <= ClientProducerCreditManagerImpl.MAX_ANONYMOUS_CREDITS_CACHE_SIZE); + } + + session.close(); + sf.createSession(); + + Assert.assertFalse(flowCallback.blocked); + } + @Test public void testProducerCreditsRefCounting() throws Exception { server = createServer(false, isNetty()); @@ -727,30 +780,30 @@ public class ProducerFlowControlTest extends ActiveMQTestBase { ClientProducer prod1 = session.createProducer("address"); Assert.assertEquals(1, ((ClientSessionInternal) session).getProducerCreditManager().creditsMapSize()); - Assert.assertEquals(0, ((ClientSessionInternal) session).getProducerCreditManager().unReferencedCreditsSize()); + Assert.assertEquals(0, ((ClientSessionInternal) session).getProducerCreditManager().getMaxAnonymousCacheSize()); ClientProducer prod2 = session.createProducer("address"); Assert.assertEquals(1, ((ClientSessionInternal) session).getProducerCreditManager().creditsMapSize()); - Assert.assertEquals(0, ((ClientSessionInternal) session).getProducerCreditManager().unReferencedCreditsSize()); + Assert.assertEquals(0, ((ClientSessionInternal) session).getProducerCreditManager().getMaxAnonymousCacheSize()); ClientProducer prod3 = session.createProducer("address"); Assert.assertEquals(1, ((ClientSessionInternal) session).getProducerCreditManager().creditsMapSize()); - Assert.assertEquals(0, ((ClientSessionInternal) session).getProducerCreditManager().unReferencedCreditsSize()); + Assert.assertEquals(0, ((ClientSessionInternal) session).getProducerCreditManager().getMaxAnonymousCacheSize()); prod1.close(); Assert.assertEquals(1, ((ClientSessionInternal) session).getProducerCreditManager().creditsMapSize()); - Assert.assertEquals(0, ((ClientSessionInternal) session).getProducerCreditManager().unReferencedCreditsSize()); + Assert.assertEquals(0, ((ClientSessionInternal) session).getProducerCreditManager().getMaxAnonymousCacheSize()); prod2.close(); Assert.assertEquals(1, ((ClientSessionInternal) session).getProducerCreditManager().creditsMapSize()); - Assert.assertEquals(0, ((ClientSessionInternal) session).getProducerCreditManager().unReferencedCreditsSize()); + Assert.assertEquals(0, ((ClientSessionInternal) session).getProducerCreditManager().getMaxAnonymousCacheSize()); prod3.close(); Assert.assertEquals(1, ((ClientSessionInternal) session).getProducerCreditManager().creditsMapSize()); - Assert.assertEquals(1, ((ClientSessionInternal) session).getProducerCreditManager().unReferencedCreditsSize()); + Assert.assertEquals(1, ((ClientSessionInternal) session).getProducerCreditManager().getMaxAnonymousCacheSize()); } }
