[ 
https://issues.apache.org/jira/browse/ROCKETMQ-112?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15887420#comment-15887420
 ] 

zhaoziyan commented on ROCKETMQ-112:
------------------------------------

    private RemotingCommand queryConsumerOffset(ChannelHandlerContext ctx, 
RemotingCommand request)
            throws RemotingCommandException {
        final RemotingCommand response =
                
RemotingCommand.createResponseCommand(QueryConsumerOffsetResponseHeader.class);
        final QueryConsumerOffsetResponseHeader responseHeader =
                (QueryConsumerOffsetResponseHeader) response.readCustomHeader();
        final QueryConsumerOffsetRequestHeader requestHeader =
                (QueryConsumerOffsetRequestHeader) request
                    
.decodeCommandCustomHeader(QueryConsumerOffsetRequestHeader.class);

        long offset =
                this.brokerController.getConsumerOffsetManager().queryOffset(
                    requestHeader.getConsumerGroup(), requestHeader.getTopic(), 
requestHeader.getQueueId());

        if (offset >= 0) {
            responseHeader.setOffset(offset);
            response.setCode(ResponseCode.SUCCESS);
            response.setRemark(null);
        }
        else {
            long minOffset =
                    
this.brokerController.getMessageStore().getMinOffsetInQuque(requestHeader.getTopic(),
                        requestHeader.getQueueId());
            if (minOffset <= 0
                    && 
!this.brokerController.getMessageStore().checkInDiskByConsumeOffset(
                        requestHeader.getTopic(), requestHeader.getQueueId(), 
0)) {
                responseHeader.setOffset(0L);
                response.setCode(ResponseCode.SUCCESS);
                response.setRemark(null);
            }
            else {
                response.setCode(ResponseCode.QUERY_NOT_FOUND);
                response.setRemark("Not found, V3_0_6_SNAPSHOT maybe this group 
consumer boot first");
            }
        }

        return response;
    }

queryConsumerOffset if is a new consumer grup,then return the min offset,so the 
CONSUME_FROM_LAST_OFFSET  dont work 

> MQ client CONSUME_FROM_LAST_OFFSET dont work
> --------------------------------------------
>
>                 Key: ROCKETMQ-112
>                 URL: https://issues.apache.org/jira/browse/ROCKETMQ-112
>             Project: Apache RocketMQ
>          Issue Type: Bug
>          Components: rocketmq-client
>            Reporter: zhaoziyan
>            Assignee: Xiaorui Wang
>
>         case CONSUME_FROM_LAST_OFFSET: {
>             long lastOffset = offsetStore.readOffset(mq, 
> ReadOffsetType.READ_FROM_STORE);
>             if (lastOffset >= 0) {
>                 result = lastOffset;
>             }
>             // First start,no offset
>             else if (-1 == lastOffset) {
>                 if 
> (mq.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
>                     result = 0L;
>                 }
>                 else {
>                     try {
>                         result = 
> this.mQClientFactory.getMQAdminImpl().maxOffset(mq);
>                     }
>                     catch (MQClientException e) {
>                         result = -1;
>                     }
>                 }
>             }
>             else {
>                 result = -1;
>             }
>             break;
>         }
> offsetStore.readOffset is minOffset not the maxOffset 
> CONSUME_FROM_LAST_OFFSET dont work



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to