[
https://issues.apache.org/jira/browse/ROCKETMQ-112?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15887420#comment-15887420
]
zhaoziyan commented on ROCKETMQ-112:
------------------------------------
private RemotingCommand queryConsumerOffset(ChannelHandlerContext ctx,
RemotingCommand request)
throws RemotingCommandException {
final RemotingCommand response =
RemotingCommand.createResponseCommand(QueryConsumerOffsetResponseHeader.class);
final QueryConsumerOffsetResponseHeader responseHeader =
(QueryConsumerOffsetResponseHeader) response.readCustomHeader();
final QueryConsumerOffsetRequestHeader requestHeader =
(QueryConsumerOffsetRequestHeader) request
.decodeCommandCustomHeader(QueryConsumerOffsetRequestHeader.class);
long offset =
this.brokerController.getConsumerOffsetManager().queryOffset(
requestHeader.getConsumerGroup(), requestHeader.getTopic(),
requestHeader.getQueueId());
if (offset >= 0) {
responseHeader.setOffset(offset);
response.setCode(ResponseCode.SUCCESS);
response.setRemark(null);
}
else {
long minOffset =
this.brokerController.getMessageStore().getMinOffsetInQuque(requestHeader.getTopic(),
requestHeader.getQueueId());
if (minOffset <= 0
&&
!this.brokerController.getMessageStore().checkInDiskByConsumeOffset(
requestHeader.getTopic(), requestHeader.getQueueId(),
0)) {
responseHeader.setOffset(0L);
response.setCode(ResponseCode.SUCCESS);
response.setRemark(null);
}
else {
response.setCode(ResponseCode.QUERY_NOT_FOUND);
response.setRemark("Not found, V3_0_6_SNAPSHOT maybe this group
consumer boot first");
}
}
return response;
}
queryConsumerOffset if is a new consumer grup,then return the min offset,so the
CONSUME_FROM_LAST_OFFSET dont work
> MQ client CONSUME_FROM_LAST_OFFSET dont work
> --------------------------------------------
>
> Key: ROCKETMQ-112
> URL: https://issues.apache.org/jira/browse/ROCKETMQ-112
> Project: Apache RocketMQ
> Issue Type: Bug
> Components: rocketmq-client
> Reporter: zhaoziyan
> Assignee: Xiaorui Wang
>
> case CONSUME_FROM_LAST_OFFSET: {
> long lastOffset = offsetStore.readOffset(mq,
> ReadOffsetType.READ_FROM_STORE);
> if (lastOffset >= 0) {
> result = lastOffset;
> }
> // First start,no offset
> else if (-1 == lastOffset) {
> if
> (mq.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
> result = 0L;
> }
> else {
> try {
> result =
> this.mQClientFactory.getMQAdminImpl().maxOffset(mq);
> }
> catch (MQClientException e) {
> result = -1;
> }
> }
> }
> else {
> result = -1;
> }
> break;
> }
> offsetStore.readOffset is minOffset not the maxOffset
> CONSUME_FROM_LAST_OFFSET dont work
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)