xxd763795151 opened a new issue #571:
URL: https://github.com/apache/rocketmq-client-go/issues/571
**BUG REPORT**
**Please add the branch name [Native]/[Master] at the header of the Isssue
title.**
1. Please describe the issue you observed:
Message has been deleted because of expiration, and there is a problem with
the consume offset update.
- What did you do (The steps to reproduce)?
1. Create a topic "test_topic" and send some message
2. After a few days, I have a consumer group want to keep consuming these
message that has been deleted because of expiration. For example, now, each
quque brokerOffset is 2020 , consume offset is 0.
Note, do not send message during the time.
- What did you expect to see?
When my consumer has started, the consume offset should be corrected to a
correct value,such as 2020, but still 0.
However, the java version of the client does not have this problem.
- What did you see instead?
I have to use java version of the client starting once.
2. Please tell us about your environment:
- What is your OS?
mac os
- What is your client version?
go client V4.5.2 and the java client for error correction is 4.7.1
- What is your RocketMQ version?
4.7.1
3. Other information (e.g. detailed explanation, logs, related issues,
suggestions on how to fix, etc):
go client code(push_consumer.go):
```
case primitive.PullNoNewMsg:
rlog.Debug(fmt.Sprintf("Topic: %s, QueueId: %d no more
msg, current offset: %d, next offset: %d",
request.mq.Topic, request.mq.QueueId,
pullRequest.QueueOffset, result.NextBeginOffset), nil)
```
Just record some log. And if there is not any message, it does not update
consume offset:
```
func (pc *pushConsumer) consumeMessageCurrently(pq *processQueue, mq
*primitive.MessageQueue) {
msgs := pq.getMessages()
if msgs == nil {
return
}
// return above, the following code does not run
for count := 0; count < len(msgs); count++ {
var subMsgs []*primitive.MessageExt
....
}
......
offset := pq.removeMessage(subMsgs...)
if offset >= 0 && !pq.IsDroppd() {
pc.storage.update(mq, int64(offset),
true)
}
.....
}
```
Look the following java
code(org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#pullMessage):
```
case NO_NEW_MSG:
pullRequest.setNextOffset(pullResult.getNextBeginOffset());
// 纠正偏移,偏移值会更新为result的偏移(broker返回的正确偏移)
DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest);
DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
break;
```
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]