[ 
https://issues.apache.org/jira/browse/ARTEMIS-4453?focusedWorklogId=884168&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-884168
 ]

ASF GitHub Bot logged work on ARTEMIS-4453:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 09/Oct/23 19:15
            Start Date: 09/Oct/23 19:15
    Worklog Time Spent: 10m 
      Work Description: clebertsuconic commented on code in PR #4643:
URL: https://github.com/apache/activemq-artemis/pull/4643#discussion_r1350713249


##########
artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerCreditManagerImpl.java:
##########
@@ -162,26 +175,30 @@ private void addToUnReferencedCache(final SimpleString 
address, final ClientProd
       unReferencedCredits.put(address, credits);
 
       if (unReferencedCredits.size() > MAX_UNREFERENCED_CREDITS_CACHE_SIZE) {
-         // Remove the oldest entry
-
-         Iterator<Map.Entry<SimpleString, ClientProducerCredits>> iter = 
unReferencedCredits.entrySet().iterator();
-
-         Map.Entry<SimpleString, ClientProducerCredits> oldest = iter.next();
-
-         iter.remove();
-
-         removeEntry(oldest.getKey(), oldest.getValue());
+         // if we've exceeded our limit then try to clean up
+         if (future == null) {
+            future = scheduledThreadPool.scheduleWithFixedDelay(() -> {
+               synchronized (this) {
+                  Iterator<Map.Entry<SimpleString, ClientProducerCredits>> 
iter = unReferencedCredits.entrySet().iterator();
+                  while (iter.hasNext()) {
+                     Map.Entry<SimpleString, ClientProducerCredits> entry = 
iter.next();
+                     if (entry.getValue().getBalance() == 0) {
+                        iter.remove();
+                        producerCredits.remove(entry.getKey());
+                        entry.getValue().close();
+                     }
+                  }
+               }
+            }, 0, 30, TimeUnit.SECONDS);

Review Comment:
   You are using the scheduledPool directly here. 
   
   I would suggest using an encapsulation from ActiveMQSCheduledComponent. That 
way you call start/stop based on the results.
   



##########
artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerCreditManagerImpl.java:
##########
@@ -162,26 +175,30 @@ private void addToUnReferencedCache(final SimpleString 
address, final ClientProd
       unReferencedCredits.put(address, credits);
 
       if (unReferencedCredits.size() > MAX_UNREFERENCED_CREDITS_CACHE_SIZE) {
-         // Remove the oldest entry
-
-         Iterator<Map.Entry<SimpleString, ClientProducerCredits>> iter = 
unReferencedCredits.entrySet().iterator();
-
-         Map.Entry<SimpleString, ClientProducerCredits> oldest = iter.next();
-
-         iter.remove();
-
-         removeEntry(oldest.getKey(), oldest.getValue());
+         // if we've exceeded our limit then try to clean up
+         if (future == null) {
+            future = scheduledThreadPool.scheduleWithFixedDelay(() -> {
+               synchronized (this) {
+                  Iterator<Map.Entry<SimpleString, ClientProducerCredits>> 
iter = unReferencedCredits.entrySet().iterator();
+                  while (iter.hasNext()) {
+                     Map.Entry<SimpleString, ClientProducerCredits> entry = 
iter.next();
+                     if (entry.getValue().getBalance() == 0) {
+                        iter.remove();
+                        producerCredits.remove(entry.getKey());
+                        entry.getValue().close();
+                     }
+                  }
+               }
+            }, 0, 30, TimeUnit.SECONDS);

Review Comment:
   also: Perhaps we could have this scheduling done somewhere else?



##########
artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerCreditManagerImpl.java:
##########
@@ -162,26 +175,30 @@ private void addToUnReferencedCache(final SimpleString 
address, final ClientProd
       unReferencedCredits.put(address, credits);
 
       if (unReferencedCredits.size() > MAX_UNREFERENCED_CREDITS_CACHE_SIZE) {
-         // Remove the oldest entry
-
-         Iterator<Map.Entry<SimpleString, ClientProducerCredits>> iter = 
unReferencedCredits.entrySet().iterator();
-
-         Map.Entry<SimpleString, ClientProducerCredits> oldest = iter.next();
-
-         iter.remove();
-
-         removeEntry(oldest.getKey(), oldest.getValue());
+         // if we've exceeded our limit then try to clean up
+         if (future == null) {
+            future = scheduledThreadPool.scheduleWithFixedDelay(() -> {
+               synchronized (this) {
+                  Iterator<Map.Entry<SimpleString, ClientProducerCredits>> 
iter = unReferencedCredits.entrySet().iterator();
+                  while (iter.hasNext()) {
+                     Map.Entry<SimpleString, ClientProducerCredits> entry = 
iter.next();
+                     if (entry.getValue().getBalance() == 0) {
+                        iter.remove();
+                        producerCredits.remove(entry.getKey());
+                        entry.getValue().close();
+                     }
+                  }
+               }
+            }, 0, 30, TimeUnit.SECONDS);
+         }
+      } else {
+         // if we're below our limit make sure we're not trying to clean up
+         if (future != null) {
+            future.cancel(false);
+         }
       }

Review Comment:
   if you use the ActiveMQScheuledComponent approach I mentioned this will go 
away, but you should also set future=null, otherwise you will keep calling 
cancel. But as I said.. if you use the scheduledComponent I think that goes 
away.





Issue Time Tracking
-------------------

    Worklog Id:     (was: 884168)
    Time Spent: 1h 20m  (was: 1h 10m)

> Bridge blocked by flow control, seemingly forever
> -------------------------------------------------
>
>                 Key: ARTEMIS-4453
>                 URL: https://issues.apache.org/jira/browse/ARTEMIS-4453
>             Project: ActiveMQ Artemis
>          Issue Type: Bug
>          Components: Broker, Clustering
>            Reporter: Andy Gustafson
>            Assignee: Justin Bertram
>            Priority: Major
>          Time Spent: 1h 20m
>  Remaining Estimate: 0h
>
> I found what I believe to be a bug related to flow control with symmetric 
> clusters. I wrote [this 
> example|https://github.com/acgustafson/artemis-flowcontrol-issue] to 
> reproduce  the behavior I am seeing. The test program does the following:
>  * Create 2 clustered embedded brokers
>  * Create 3000 topics with 1 producer per topic sending 1 message every 30 
> seconds with 1500 producers on each broker.
>  * Create 12 consumers on a wildcard that matches all 3000 topics with 6 
> consumers on each broker
>  * Create 1 additional "statistics" wildcard consumer to log how many 
> messages it has consumed over the previous 30 seconds
> Shortly after running this program the "statistics consumer" shows it's only 
> consuming 1500 messages per 30 seconds. The broker's logs show the 
> $artemis.internal.sf queues are being blocked on flow control. From what I 
> can tell the broker never seems to recover from this, even after killing all 
> of the producers/consumers and restarting with a much lower number of 
> producers/consumers. 
> This behaviour started appearing for me in 2.22.0 after 
> https://issues.apache.org/jira/browse/ARTEMIS-3805 changed the default 
> producer-window-size to 1MB from -1.
>  
> I can reproduce the issue using the latest snapshot. If you set the 
> producer-window-size back to -1 the issue no longer appears. In my test 
> program I used wildcards, but I was able to reproduce this without wildcards 
> as well.
>  
> Program to reproduce: 
> [https://github.com/acgustafson/artemis-flowcontrol-issue]
>  
> Discussion of this issue in slack at 
> [https://the-asf.slack.com/archives/CFL910J30/p1696356372797149?thread_ts=1694705189.957139&cid=CFL910J30]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to