This is an automated email from the ASF dual-hosted git repository. aaronai pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git
commit d429372ea90241d994dade58fe6955ecc4e4b8f9 Author: Aaron Ai <[email protected]> AuthorDate: Wed Nov 2 10:17:14 2022 +0800 Revert "Define index of LoadBalancer as static (#272)" This reverts commit 105640858ae1d9c71e2055ce12f8bc55720c3286. --- .../rocketmq/client/java/impl/consumer/SubscriptionLoadBalancer.java | 5 +++-- .../rocketmq/client/java/impl/producer/PublishingLoadBalancer.java | 5 +++-- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SubscriptionLoadBalancer.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SubscriptionLoadBalancer.java index b9b096e..012d441 100644 --- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SubscriptionLoadBalancer.java +++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SubscriptionLoadBalancer.java @@ -34,13 +34,14 @@ public class SubscriptionLoadBalancer { /** * Index for round-robin. */ - private static final AtomicInteger INDEX = new AtomicInteger(RandomUtils.nextInt(0, Integer.MAX_VALUE)); + private final AtomicInteger index; /** * Message queues to receive message. */ private final ImmutableList<MessageQueueImpl> messageQueues; public SubscriptionLoadBalancer(TopicRouteData topicRouteData) { + this.index = new AtomicInteger(RandomUtils.nextInt(0, Integer.MAX_VALUE)); final List<MessageQueueImpl> mqs = topicRouteData.getMessageQueues().stream() .filter((Predicate<MessageQueueImpl>) mq -> mq.getPermission().isReadable() && Utilities.MASTER_BROKER_ID == mq.getBroker().getId()) @@ -52,7 +53,7 @@ public class SubscriptionLoadBalancer { } public MessageQueueImpl takeMessageQueue() { - final int next = INDEX.getAndIncrement(); + final int next = index.getAndIncrement(); return messageQueues.get(IntMath.mod(next, messageQueues.size())); } } diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/PublishingLoadBalancer.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/PublishingLoadBalancer.java index 2c9597e..feb9616 100644 --- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/PublishingLoadBalancer.java +++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/PublishingLoadBalancer.java @@ -43,13 +43,14 @@ public class PublishingLoadBalancer { /** * Index for round-robin. */ - private static final AtomicInteger INDEX = new AtomicInteger(RandomUtils.nextInt(0, Integer.MAX_VALUE)); + private final AtomicInteger index; /** * Message queues to send message. */ private final ImmutableList<MessageQueueImpl> messageQueues; public PublishingLoadBalancer(TopicRouteData topicRouteData) { + this.index = new AtomicInteger(RandomUtils.nextInt(0, Integer.MAX_VALUE)); final List<MessageQueueImpl> mqs = topicRouteData.getMessageQueues().stream() .filter((Predicate<MessageQueueImpl>) mq -> mq.getPermission().isWritable() && Utilities.MASTER_BROKER_ID == mq.getBroker().getId()) @@ -67,7 +68,7 @@ public class PublishingLoadBalancer { } public List<MessageQueueImpl> takeMessageQueues(Set<Endpoints> excluded, int count) { - int next = INDEX.getAndIncrement(); + int next = index.getAndIncrement(); List<MessageQueueImpl> candidates = new ArrayList<>(); Set<String> candidateBrokerNames = new HashSet<>();
