guyinyou commented on code in PR #6417:
URL: https://github.com/apache/rocketmq/pull/6417#discussion_r1147087894


##########
client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java:
##########
@@ -1036,6 +1026,25 @@ private void addResultToQueue(PullResult pullResult, 
ProcessQueue processQueue)
                     break;
             }
             updatePullOffset(messageQueue, pullResult.getNextBeginOffset(), 
processQueue);
+
+            if (!this.isCancelled()) {
+                scheduledThreadPoolExecutor.schedule(this, pullDelayTimeMills, 
TimeUnit.MILLISECONDS);
+            } else {
+                log.warn("The Pull Task is cancelled after doPullTask, {}", 
messageQueue);
+            }
+        }
+
+        private void handleError(Throwable e) {
+            if (e instanceof InterruptedException) {
+                log.warn("Polling thread was interrupted.", e);
+            } else {
+                if (e instanceof MQBrokerException && ((MQBrokerException) 
e).getResponseCode() == ResponseCode.FLOW_CONTROL) {
+                    pullDelayTimeMills = 
PULL_TIME_DELAY_MILLS_WHEN_BROKER_FLOW_CONTROL;
+                } else {
+                    pullDelayTimeMills = pullTimeDelayMillsWhenException;

Review Comment:
   The time for the next scheduling is changed here, but there is no scheduling



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to