This is an automated email from the ASF dual-hosted git repository.
lizhimin pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new d63373a152 [ISSUE #8957] Remove excess traffic and fix cache
inconsistencies (#8958)
d63373a152 is described below
commit d63373a152ebd395cdce6a2e04e01b62e54c76af
Author: hqbfz <[email protected]>
AuthorDate: Wed Dec 25 14:38:57 2024 +0800
[ISSUE #8957] Remove excess traffic and fix cache inconsistencies (#8958)
---
.../client/impl/consumer/RebalanceImpl.java | 53 +---------------------
1 file changed, 1 insertion(+), 52 deletions(-)
diff --git
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceImpl.java
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceImpl.java
index d1f0d116e0..b6f1d99b1c 100644
---
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceImpl.java
+++
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceImpl.java
@@ -36,7 +36,6 @@ import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.message.MessageQueueAssignment;
import org.apache.rocketmq.common.message.MessageRequestMode;
-import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
import org.apache.rocketmq.remoting.protocol.body.LockBatchRequestBody;
import org.apache.rocketmq.remoting.protocol.body.UnlockBatchRequestBody;
import org.apache.rocketmq.remoting.protocol.filter.FilterAPI;
@@ -60,12 +59,8 @@ public abstract class RebalanceImpl {
protected MessageModel messageModel;
protected AllocateMessageQueueStrategy allocateMessageQueueStrategy;
protected MQClientInstance mQClientFactory;
- private static final int TIMEOUT_CHECK_TIMES = 3;
private static final int QUERY_ASSIGNMENT_TIMEOUT = 3000;
- private Map<String, String> topicBrokerRebalance = new
ConcurrentHashMap<>();
- private Map<String, String> topicClientRebalance = new
ConcurrentHashMap<>();
-
public RebalanceImpl(String consumerGroup, MessageModel messageModel,
AllocateMessageQueueStrategy allocateMessageQueueStrategy,
MQClientInstance mQClientFactory) {
@@ -241,7 +236,7 @@ public abstract class RebalanceImpl {
for (final Map.Entry<String, SubscriptionData> entry :
subTable.entrySet()) {
final String topic = entry.getKey();
try {
- if (!clientRebalance(topic) && tryQueryAssignment(topic)) {
+ if (!clientRebalance(topic)) {
boolean result =
this.getRebalanceResultFromBroker(topic, isOrder);
if (!result) {
balanced = false;
@@ -266,38 +261,6 @@ public abstract class RebalanceImpl {
return balanced;
}
- private boolean tryQueryAssignment(String topic) {
- if (topicClientRebalance.containsKey(topic)) {
- return false;
- }
-
- if (topicBrokerRebalance.containsKey(topic)) {
- return true;
- }
- String strategyName = allocateMessageQueueStrategy != null ?
allocateMessageQueueStrategy.getName() : null;
- int retryTimes = 0;
- while (retryTimes++ < TIMEOUT_CHECK_TIMES) {
- try {
- Set<MessageQueueAssignment> resultSet =
mQClientFactory.queryAssignment(topic, consumerGroup,
- strategyName, messageModel, QUERY_ASSIGNMENT_TIMEOUT /
TIMEOUT_CHECK_TIMES * retryTimes);
- topicBrokerRebalance.put(topic, topic);
- return true;
- } catch (Throwable t) {
- if (!(t instanceof RemotingTimeoutException)) {
- log.error("tryQueryAssignment error.", t);
- topicClientRebalance.put(topic, topic);
- return false;
- }
- }
- }
- if (retryTimes >= TIMEOUT_CHECK_TIMES) {
- // if never success before and timeout exceed TIMEOUT_CHECK_TIMES,
force client rebalance
- topicClientRebalance.put(topic, topic);
- return false;
- }
- return true;
- }
-
public ConcurrentMap<String, SubscriptionData> getSubscriptionInner() {
return subscriptionInner;
}
@@ -460,20 +423,6 @@ public abstract class RebalanceImpl {
}
}
}
-
- Iterator<Map.Entry<String, String>> clientIter =
topicClientRebalance.entrySet().iterator();
- while (clientIter.hasNext()) {
- if (!subTable.containsKey(clientIter.next().getKey())) {
- clientIter.remove();
- }
- }
-
- Iterator<Map.Entry<String, String>> brokerIter =
topicBrokerRebalance.entrySet().iterator();
- while (brokerIter.hasNext()) {
- if (!subTable.containsKey(brokerIter.next().getKey())) {
- brokerIter.remove();
- }
- }
}
private boolean updateProcessQueueTableInRebalance(final String topic,
final Set<MessageQueue> mqSet,