This is an automated email from the ASF dual-hosted git repository.

zhangjidi2016 pushed a commit to branch 5.0.0-beta
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/5.0.0-beta by this push:
     new 499b7dbe8 [ISSUE #4171]Fix tryQueryAssignment() in RebalanceImpl
     new 52e059e28 Merge pull request #4173 from 
zhangjidi2016/fix_tryQueryAssignment
499b7dbe8 is described below

commit 499b7dbe844558b95567f8467c01c0ac3fccc873
Author: zhangjidi2016 <[email protected]>
AuthorDate: Fri Apr 15 14:36:01 2022 +0800

    [ISSUE #4171]Fix tryQueryAssignment() in RebalanceImpl
---
 .../client/impl/consumer/RebalanceImpl.java        | 38 ++++++++--------------
 1 file changed, 13 insertions(+), 25 deletions(-)

diff --git 
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceImpl.java
 
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceImpl.java
index 5788d1cbe..dff7efd63 100644
--- 
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceImpl.java
+++ 
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceImpl.java
@@ -273,40 +273,28 @@ public abstract class RebalanceImpl {
         if (topicBrokerRebalance.containsKey(topic)) {
             return true;
         }
-
         String strategyName = allocateMessageQueueStrategy != null ? 
allocateMessageQueueStrategy.getName() : null;
-
-        boolean success = false;
-        int i = 0;
-        int timeOut = 0;
-        while (i++ < TIMEOUT_CHECK_TIMES) {
+        int retryTimes = 0;
+        while (retryTimes++ < TIMEOUT_CHECK_TIMES) {
             try {
                 Set<MessageQueueAssignment> resultSet = 
mQClientFactory.queryAssignment(topic, consumerGroup,
-                    strategyName, messageModel, QUERY_ASSIGNMENT_TIMEOUT / 
TIMEOUT_CHECK_TIMES * i);
-                success = true;
-                break;
+                    strategyName, messageModel, QUERY_ASSIGNMENT_TIMEOUT / 
TIMEOUT_CHECK_TIMES * retryTimes);
+                topicBrokerRebalance.put(topic, topic);
+                return true;
             } catch (Throwable t) {
-                if (t instanceof RemotingTimeoutException) {
-                    timeOut++;
-                } else {
+                if (!(t instanceof RemotingTimeoutException)) {
                     log.error("tryQueryAssignment error.", t);
-                    break;
+                    topicClientRebalance.put(topic, topic);
+                    return false;
                 }
             }
         }
-
-        if (success) {
-            topicBrokerRebalance.put(topic, topic);
-            return true;
-        } else {
-            if (timeOut >= TIMEOUT_CHECK_TIMES) {
-                // if never success before and timeout exceed 
TIMEOUT_CHECK_TIMES, force client rebalance
-                topicClientRebalance.put(topic, topic);
-                return false;
-            } else {
-                return true;
-            }
+        if (retryTimes >= TIMEOUT_CHECK_TIMES) {
+            // if never success before and timeout exceed TIMEOUT_CHECK_TIMES, 
force client rebalance
+            topicClientRebalance.put(topic, topic);
+            return false;
         }
+        return true;
     }
 
     public ConcurrentMap<String, SubscriptionData> getSubscriptionInner() {

Reply via email to