This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/main by this push:
     new c6a82ff95e ARTEMIS-4314 Small Tweak: using executor directly if no 
delay
c6a82ff95e is described below

commit c6a82ff95ef94cd192fa17c515d535d42dbafb8e
Author: Clebert Suconic <clebertsuco...@apache.org>
AuthorDate: Fri Jun 16 16:13:49 2023 -0400

    ARTEMIS-4314 Small Tweak: using executor directly if no delay
---
 .../federation/FederatedQueueConsumerImpl.java     | 36 +++++++++++++---------
 1 file changed, 21 insertions(+), 15 deletions(-)

diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/federation/FederatedQueueConsumerImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/federation/FederatedQueueConsumerImpl.java
index d2470e5a6b..ca304719cf 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/federation/FederatedQueueConsumerImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/federation/FederatedQueueConsumerImpl.java
@@ -208,23 +208,29 @@ public class FederatedQueueConsumerImpl implements 
FederatedQueueConsumer, Sessi
    }
 
    private void scheduleCreditOnEmpty(final int delay, final QueueHandle 
handle) {
-      scheduledExecutorService.schedule(() -> {
-         // use queue executor to sync on message count metric
-         handle.getExecutor().execute(() -> {
-            if (clientConsumer != null) {
-               if (0L == handle.getMessageCount()) {
-                  flow(handle.getCreditWindow());
-                  pendingPullCredit.set(handle.getCreditWindow());
-               } else {
-                  if (0 == delay) {
-                     clientConsumer.resetIfSlowConsumer();
-                     pendingPullCredit.set(0);
-                  }
-                  
scheduleCreditOnEmpty(FederatedQueueConsumer.getNextDelay(delay, 
intialConnectDelayMultiplier, intialConnectDelayMax), handle);
+
+      Runnable runnable = () -> {
+         if (clientConsumer != null) {
+            if (0L == handle.getMessageCount()) {
+               flow(handle.getCreditWindow());
+               pendingPullCredit.set(handle.getCreditWindow());
+            } else {
+               if (0 == delay) {
+                  clientConsumer.resetIfSlowConsumer();
+                  pendingPullCredit.set(0);
                }
+               
scheduleCreditOnEmpty(FederatedQueueConsumer.getNextDelay(delay, 
intialConnectDelayMultiplier, intialConnectDelayMax), handle);
             }
-         });
-      }, delay, TimeUnit.SECONDS);
+         }
+      };
+
+      if (delay == 0) { // if delay==0 just use the executor directly
+         handle.getExecutor().execute(runnable);
+      } else {
+         scheduledExecutorService.schedule(() -> {
+            handle.getExecutor().execute(runnable);
+         }, delay, TimeUnit.SECONDS);
+      }
    }
 
    private void flow(int creditWindow) {

Reply via email to