LvChenhui opened a new issue #2298:
URL: https://github.com/apache/rocketmq/issues/2298
Hi all:
I execute "reset offset command from the console paltform when consumer
client is alive.But when I compare the number of messages,high data probability
is less. So I tried to reproduce the problem and find the key.
I find that when the client accepts a request from the Broker,it will
suspend consumer then reset offset, finally resume consumer. but when suspend
consumer,the other thread may already execute inner method to update
offset.Here's some of the code I found for the impact.
```
MQClientInstance#resetOffset
public void resetOffset(String topic, String group, Map<MessageQueue, Long>
offsetTable) {
....
consumer.suspend();
...
//find processQueue
...
//Maybe this code is a temporary solution to this problem
try {
TimeUnit.SECONDS.sleep(10);
} catch (InterruptedException e) {
}
...
//reset offset and drop
...
} finally {
if (consumer != null) {
consumer.resume();
}
}
}
```
DefaultMQPushConsumerImpl#pullMessage#PullCallback#onSuccess
PullCallback pullCallback = new PullCallback() {
@Override
public void onSuccess(PullResult pullResult) {
if (pullResult != null) {
pullResult =
DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(),
pullResult,
subscriptionData);
switch (pullResult.getPullStatus()) {
case FOUND:
...
break;
case NO_NEW_MSG:
pullRequest.setNextOffset(pullResult.getNextBeginOffset());
// update offset
DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest);
DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
break;
case NO_MATCHED_MSG:
pullRequest.setNextOffset(pullResult.getNextBeginOffset());
// update offset
DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest);
DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
break;
case OFFSET_ILLEGAL:
....
break;
default:
break;
}
}
}
```
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]