This is an automated email from the ASF dual-hosted git repository.
gtully pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/main by this push:
new a76e9f81d2 Revert "ARTEMIS-4314 Small Tweak: using executor directly
if no delay"
a76e9f81d2 is described below
commit a76e9f81d2b6f01ec946d9b81aef0a57306451cb
Author: Gary Tully <[email protected]>
AuthorDate: Thu Jun 22 15:40:45 2023 +0100
Revert "ARTEMIS-4314 Small Tweak: using executor directly if no delay"
This reverts commit c6a82ff95ef94cd192fa17c515d535d42dbafb8e.
---
.../federation/FederatedQueueConsumerImpl.java | 36 +++++++++-------------
1 file changed, 15 insertions(+), 21 deletions(-)
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/federation/FederatedQueueConsumerImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/federation/FederatedQueueConsumerImpl.java
index ca304719cf..d2470e5a6b 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/federation/FederatedQueueConsumerImpl.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/federation/FederatedQueueConsumerImpl.java
@@ -208,29 +208,23 @@ public class FederatedQueueConsumerImpl implements
FederatedQueueConsumer, Sessi
}
private void scheduleCreditOnEmpty(final int delay, final QueueHandle
handle) {
-
- Runnable runnable = () -> {
- if (clientConsumer != null) {
- if (0L == handle.getMessageCount()) {
- flow(handle.getCreditWindow());
- pendingPullCredit.set(handle.getCreditWindow());
- } else {
- if (0 == delay) {
- clientConsumer.resetIfSlowConsumer();
- pendingPullCredit.set(0);
+ scheduledExecutorService.schedule(() -> {
+ // use queue executor to sync on message count metric
+ handle.getExecutor().execute(() -> {
+ if (clientConsumer != null) {
+ if (0L == handle.getMessageCount()) {
+ flow(handle.getCreditWindow());
+ pendingPullCredit.set(handle.getCreditWindow());
+ } else {
+ if (0 == delay) {
+ clientConsumer.resetIfSlowConsumer();
+ pendingPullCredit.set(0);
+ }
+
scheduleCreditOnEmpty(FederatedQueueConsumer.getNextDelay(delay,
intialConnectDelayMultiplier, intialConnectDelayMax), handle);
}
-
scheduleCreditOnEmpty(FederatedQueueConsumer.getNextDelay(delay,
intialConnectDelayMultiplier, intialConnectDelayMax), handle);
}
- }
- };
-
- if (delay == 0) { // if delay==0 just use the executor directly
- handle.getExecutor().execute(runnable);
- } else {
- scheduledExecutorService.schedule(() -> {
- handle.getExecutor().execute(runnable);
- }, delay, TimeUnit.SECONDS);
- }
+ });
+ }, delay, TimeUnit.SECONDS);
}
private void flow(int creditWindow) {