This is an automated email from the ASF dual-hosted git repository.
zhangjidi2016 pushed a commit to branch 5.0.0-beta
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/5.0.0-beta by this push:
new 499b7dbe8 [ISSUE #4171]Fix tryQueryAssignment() in RebalanceImpl
new 52e059e28 Merge pull request #4173 from
zhangjidi2016/fix_tryQueryAssignment
499b7dbe8 is described below
commit 499b7dbe844558b95567f8467c01c0ac3fccc873
Author: zhangjidi2016 <[email protected]>
AuthorDate: Fri Apr 15 14:36:01 2022 +0800
[ISSUE #4171]Fix tryQueryAssignment() in RebalanceImpl
---
.../client/impl/consumer/RebalanceImpl.java | 38 ++++++++--------------
1 file changed, 13 insertions(+), 25 deletions(-)
diff --git
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceImpl.java
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceImpl.java
index 5788d1cbe..dff7efd63 100644
---
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceImpl.java
+++
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceImpl.java
@@ -273,40 +273,28 @@ public abstract class RebalanceImpl {
if (topicBrokerRebalance.containsKey(topic)) {
return true;
}
-
String strategyName = allocateMessageQueueStrategy != null ?
allocateMessageQueueStrategy.getName() : null;
-
- boolean success = false;
- int i = 0;
- int timeOut = 0;
- while (i++ < TIMEOUT_CHECK_TIMES) {
+ int retryTimes = 0;
+ while (retryTimes++ < TIMEOUT_CHECK_TIMES) {
try {
Set<MessageQueueAssignment> resultSet =
mQClientFactory.queryAssignment(topic, consumerGroup,
- strategyName, messageModel, QUERY_ASSIGNMENT_TIMEOUT /
TIMEOUT_CHECK_TIMES * i);
- success = true;
- break;
+ strategyName, messageModel, QUERY_ASSIGNMENT_TIMEOUT /
TIMEOUT_CHECK_TIMES * retryTimes);
+ topicBrokerRebalance.put(topic, topic);
+ return true;
} catch (Throwable t) {
- if (t instanceof RemotingTimeoutException) {
- timeOut++;
- } else {
+ if (!(t instanceof RemotingTimeoutException)) {
log.error("tryQueryAssignment error.", t);
- break;
+ topicClientRebalance.put(topic, topic);
+ return false;
}
}
}
-
- if (success) {
- topicBrokerRebalance.put(topic, topic);
- return true;
- } else {
- if (timeOut >= TIMEOUT_CHECK_TIMES) {
- // if never success before and timeout exceed
TIMEOUT_CHECK_TIMES, force client rebalance
- topicClientRebalance.put(topic, topic);
- return false;
- } else {
- return true;
- }
+ if (retryTimes >= TIMEOUT_CHECK_TIMES) {
+ // if never success before and timeout exceed TIMEOUT_CHECK_TIMES,
force client rebalance
+ topicClientRebalance.put(topic, topic);
+ return false;
}
+ return true;
}
public ConcurrentMap<String, SubscriptionData> getSubscriptionInner() {