This is an automated email from the ASF dual-hosted git repository.
zhouxzhan pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 23ee0eaaef [ISSUE #7646] Optimize pull onException logging (#7647)
23ee0eaaef is described below
commit 23ee0eaaef430921da6dbf5361b7e76b9e058f73
Author: Zhouxiang Zhan <[email protected]>
AuthorDate: Fri Dec 15 16:49:44 2023 +0800
[ISSUE #7646] Optimize pull onException logging (#7647)
* Optimize SUBSCRIPTION_NOT_LATEST logging
* Add message queue in pullMessage onException
---
.../rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java | 9 +++++++--
1 file changed, 7 insertions(+), 2 deletions(-)
diff --git
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
index d2a362ba56..cbde258655 100644
---
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
+++
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
@@ -332,7 +332,8 @@ public class DefaultMQPushConsumerImpl implements
MQConsumerInner {
}
}
- final SubscriptionData subscriptionData =
this.rebalanceImpl.getSubscriptionInner().get(pullRequest.getMessageQueue().getTopic());
+ final MessageQueue messageQueue = pullRequest.getMessageQueue();
+ final SubscriptionData subscriptionData =
this.rebalanceImpl.getSubscriptionInner().get(messageQueue.getTopic());
if (null == subscriptionData) {
this.executePullRequestLater(pullRequest,
pullTimeDelayMillsWhenException);
log.warn("find the consumer's subscription failed, {}",
pullRequest);
@@ -433,7 +434,11 @@ public class DefaultMQPushConsumerImpl implements
MQConsumerInner {
@Override
public void onException(Throwable e) {
if
(!pullRequest.getMessageQueue().getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX))
{
- log.warn("execute the pull request exception", e);
+ if (e instanceof MQBrokerException && ((MQBrokerException)
e).getResponseCode() == ResponseCode.SUBSCRIPTION_NOT_LATEST) {
+ log.warn("the subscription is not latest, group={},
messageQueue={}", groupName(), messageQueue);
+ } else {
+ log.warn("execute the pull request exception,
group={}, messageQueue={}", groupName(), messageQueue, e);
+ }
}
if (e instanceof MQBrokerException && ((MQBrokerException)
e).getResponseCode() == ResponseCode.FLOW_CONTROL) {