This is an automated email from the ASF dual-hosted git repository.
clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/main by this push:
new c6a82ff95e ARTEMIS-4314 Small Tweak: using executor directly if no
delay
c6a82ff95e is described below
commit c6a82ff95ef94cd192fa17c515d535d42dbafb8e
Author: Clebert Suconic <[email protected]>
AuthorDate: Fri Jun 16 16:13:49 2023 -0400
ARTEMIS-4314 Small Tweak: using executor directly if no delay
---
.../federation/FederatedQueueConsumerImpl.java | 36 +++++++++++++---------
1 file changed, 21 insertions(+), 15 deletions(-)
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/federation/FederatedQueueConsumerImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/federation/FederatedQueueConsumerImpl.java
index d2470e5a6b..ca304719cf 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/federation/FederatedQueueConsumerImpl.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/federation/FederatedQueueConsumerImpl.java
@@ -208,23 +208,29 @@ public class FederatedQueueConsumerImpl implements
FederatedQueueConsumer, Sessi
}
private void scheduleCreditOnEmpty(final int delay, final QueueHandle
handle) {
- scheduledExecutorService.schedule(() -> {
- // use queue executor to sync on message count metric
- handle.getExecutor().execute(() -> {
- if (clientConsumer != null) {
- if (0L == handle.getMessageCount()) {
- flow(handle.getCreditWindow());
- pendingPullCredit.set(handle.getCreditWindow());
- } else {
- if (0 == delay) {
- clientConsumer.resetIfSlowConsumer();
- pendingPullCredit.set(0);
- }
-
scheduleCreditOnEmpty(FederatedQueueConsumer.getNextDelay(delay,
intialConnectDelayMultiplier, intialConnectDelayMax), handle);
+
+ Runnable runnable = () -> {
+ if (clientConsumer != null) {
+ if (0L == handle.getMessageCount()) {
+ flow(handle.getCreditWindow());
+ pendingPullCredit.set(handle.getCreditWindow());
+ } else {
+ if (0 == delay) {
+ clientConsumer.resetIfSlowConsumer();
+ pendingPullCredit.set(0);
}
+
scheduleCreditOnEmpty(FederatedQueueConsumer.getNextDelay(delay,
intialConnectDelayMultiplier, intialConnectDelayMax), handle);
}
- });
- }, delay, TimeUnit.SECONDS);
+ }
+ };
+
+ if (delay == 0) { // if delay==0 just use the executor directly
+ handle.getExecutor().execute(runnable);
+ } else {
+ scheduledExecutorService.schedule(() -> {
+ handle.getExecutor().execute(runnable);
+ }, delay, TimeUnit.SECONDS);
+ }
}
private void flow(int creditWindow) {