RongtongJin commented on code in PR #6417:
URL: https://github.com/apache/rocketmq/pull/6417#discussion_r1154175155
##########
client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java:
##########
@@ -962,43 +972,83 @@ public void run() {
subscriptionData =
FilterAPI.buildSubscriptionData(topic, subExpression4Assign);
}
- PullResult pullResult = pull(messageQueue,
subscriptionData, offset, defaultLitePullConsumer.getPullBatchSize());
- if (this.isCancelled() || processQueue.isDropped()) {
- return;
- }
- switch (pullResult.getPullStatus()) {
- case FOUND:
- final Object objLock =
messageQueueLock.fetchLockObject(messageQueue);
- synchronized (objLock) {
- if (pullResult.getMsgFoundList() != null &&
!pullResult.getMsgFoundList().isEmpty() &&
assignedMessageQueue.getSeekOffset(messageQueue) == -1) {
-
processQueue.putMessage(pullResult.getMsgFoundList());
- submitConsumeRequest(new
ConsumeRequest(pullResult.getMsgFoundList(), messageQueue, processQueue));
+ PullResult pullResult;
+ switch (communicationMode) {
+ case ASYNC:
+ PullCallback pullCallback = new PullCallback() {
+ @Override
+ public void onSuccess(PullResult pullResult) {
+ addResultToQueue(pullResult, processQueue);
+ executeThreadWithDelay();
}
- }
- break;
- case OFFSET_ILLEGAL:
- log.warn("The pull request offset illegal, {}",
pullResult.toString());
+
+ @Override
+ public void onException(Throwable e) {
+ handleError(e);
+ executeThreadWithDelay();
+ }
+ };
+ pull(messageQueue, subscriptionData, offset,
defaultLitePullConsumer.getPullBatchSize(), pullCallback);
+ return;
+ case SYNC:
+ pullResult = pull(messageQueue, subscriptionData,
offset, defaultLitePullConsumer.getPullBatchSize());
+
+ addResultToQueue(pullResult, processQueue);
break;
default:
+ assert false;
break;
}
- updatePullOffset(messageQueue,
pullResult.getNextBeginOffset(), processQueue);
- } catch (InterruptedException interruptedException) {
- log.warn("Polling thread was interrupted.",
interruptedException);
- } catch (Throwable e) {
- if (e instanceof MQBrokerException && ((MQBrokerException)
e).getResponseCode() == ResponseCode.FLOW_CONTROL) {
- pullDelayTimeMills =
PULL_TIME_DELAY_MILLS_WHEN_BROKER_FLOW_CONTROL;
- } else {
- pullDelayTimeMills = pullTimeDelayMillsWhenException;
- }
- log.error("An error occurred in pull message process.", e);
+ } catch (Exception e) {
+ // For pull() sync and FilterAPI.buildSubscriptionData
+ handleError(e);
}
+ executeThreadWithDelay();
+ }
+ }
- if (!this.isCancelled()) {
- scheduledThreadPoolExecutor.schedule(this,
pullDelayTimeMills, TimeUnit.MILLISECONDS);
+ private void addResultToQueue(PullResult pullResult, ProcessQueue
processQueue) {
+ if (this.isCancelled() || processQueue.isDropped()) {
+ return;
+ }
+
+ switch (pullResult.getPullStatus()) {
+ case FOUND:
+ final Object objLock =
messageQueueLock.fetchLockObject(messageQueue);
+ synchronized (objLock) {
+ if (pullResult.getMsgFoundList() != null &&
!pullResult.getMsgFoundList().isEmpty() &&
assignedMessageQueue.getSeekOffset(messageQueue) == -1) {
+
processQueue.putMessage(pullResult.getMsgFoundList());
+ submitConsumeRequest(new
ConsumeRequest(pullResult.getMsgFoundList(), messageQueue, processQueue));
+ }
+ }
+ break;
+ case OFFSET_ILLEGAL:
+ log.warn("The pull request offset illegal, {}",
pullResult.toString());
+ break;
+ default:
+ break;
+ }
+ updatePullOffset(messageQueue, pullResult.getNextBeginOffset(),
processQueue);
+ }
+
+ private void handleError(Throwable e) {
+ if (e instanceof InterruptedException) {
+ log.warn("Polling thread was interrupted.", e);
+ } else {
+ if (e instanceof MQBrokerException && ((MQBrokerException)
e).getResponseCode() == ResponseCode.FLOW_CONTROL) {
+ pullDelayTimeMills =
PULL_TIME_DELAY_MILLS_WHEN_BROKER_FLOW_CONTROL;
} else {
- log.warn("The Pull Task is cancelled after doPullTask,
{}", messageQueue);
+ pullDelayTimeMills = pullTimeDelayMillsWhenException;
}
+ log.error("An error occurred in pull message process.", e);
+ }
+ }
+
+ private void executeThreadWithDelay() {
Review Comment:
How about rename to schedulePullTaskWithDelay?
##########
example/src/main/java/org/apache/rocketmq/example/simple/LitePullConsumerAsync.java:
##########
@@ -0,0 +1,42 @@
+package org.apache.rocketmq.example.simple;
Review Comment:
IMO, no need for a new example.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]