This is an automated email from the ASF dual-hosted git repository. clebertsuconic pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/main by this push: new c6a82ff95e ARTEMIS-4314 Small Tweak: using executor directly if no delay c6a82ff95e is described below commit c6a82ff95ef94cd192fa17c515d535d42dbafb8e Author: Clebert Suconic <clebertsuco...@apache.org> AuthorDate: Fri Jun 16 16:13:49 2023 -0400 ARTEMIS-4314 Small Tweak: using executor directly if no delay --- .../federation/FederatedQueueConsumerImpl.java | 36 +++++++++++++--------- 1 file changed, 21 insertions(+), 15 deletions(-) diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/federation/FederatedQueueConsumerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/federation/FederatedQueueConsumerImpl.java index d2470e5a6b..ca304719cf 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/federation/FederatedQueueConsumerImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/federation/FederatedQueueConsumerImpl.java @@ -208,23 +208,29 @@ public class FederatedQueueConsumerImpl implements FederatedQueueConsumer, Sessi } private void scheduleCreditOnEmpty(final int delay, final QueueHandle handle) { - scheduledExecutorService.schedule(() -> { - // use queue executor to sync on message count metric - handle.getExecutor().execute(() -> { - if (clientConsumer != null) { - if (0L == handle.getMessageCount()) { - flow(handle.getCreditWindow()); - pendingPullCredit.set(handle.getCreditWindow()); - } else { - if (0 == delay) { - clientConsumer.resetIfSlowConsumer(); - pendingPullCredit.set(0); - } - scheduleCreditOnEmpty(FederatedQueueConsumer.getNextDelay(delay, intialConnectDelayMultiplier, intialConnectDelayMax), handle); + + Runnable runnable = () -> { + if (clientConsumer != null) { + if (0L == handle.getMessageCount()) { + flow(handle.getCreditWindow()); + pendingPullCredit.set(handle.getCreditWindow()); + } else { + if (0 == delay) { + clientConsumer.resetIfSlowConsumer(); + pendingPullCredit.set(0); } + scheduleCreditOnEmpty(FederatedQueueConsumer.getNextDelay(delay, intialConnectDelayMultiplier, intialConnectDelayMax), handle); } - }); - }, delay, TimeUnit.SECONDS); + } + }; + + if (delay == 0) { // if delay==0 just use the executor directly + handle.getExecutor().execute(runnable); + } else { + scheduledExecutorService.schedule(() -> { + handle.getExecutor().execute(runnable); + }, delay, TimeUnit.SECONDS); + } } private void flow(int creditWindow) {