[
https://issues.apache.org/jira/browse/ROCKETMQ-112?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15889595#comment-15889595
]
Jaskey Lam edited comment on ROCKETMQ-112 at 3/1/17 6:37 AM:
-------------------------------------------------------------
[~zhaoziyan] please reformat your comment accroing to
https://jira.atlassian.com/secure/WikiRendererHelpAction.jspa?section=all
If a new consumer group is starts, most of the case CONSUME_FROM_LAST_OFFSET
will work as expected, please refer to queryConsumerOffset in
ConsumerManageProcessor.java.
It will return the min offset only when the topic is still quite
new(minOffset==0) and no large accumulation(checkInDiskByConsumeOffset=false),
this is to ensure that if a new queue is created, messages should still be
consumed in the newly created queue.
But indeed, this is a bit confusing, I suggest we respect strictly for case of
CONSUME_FROM_LAST_OFFSET and deal with newly created queue more precisely
{code}
if (offset >= 0) {
responseHeader.setOffset(offset);
response.setCode(ResponseCode.SUCCESS);
response.setRemark(null);
} else {
long minOffset =
this.brokerController.getMessageStore().getMinOffsetInQuque(requestHeader.getTopic(),
requestHeader.getQueueId());
if (minOffset <= 0
&&
!this.brokerController.getMessageStore().checkInDiskByConsumeOffset(
requestHeader.getTopic(), requestHeader.getQueueId(), 0)) {
//here is why sometims consume_from_last does not work as expected
responseHeader.setOffset(0L);
response.setCode(ResponseCode.SUCCESS);
response.setRemark(null);
} else {
response.setCode(ResponseCode.QUERY_NOT_FOUND);
response.setRemark("Not found, V3_0_6_SNAPSHOT maybe this group
consumer boot first");
}
}
{code}
was (Author: jaskey):
[~zhaoziyan] please reformat your comment accroing to
https://jira.atlassian.com/secure/WikiRendererHelpAction.jspa?section=all
If a new consumer group is starts, most of the case CONSUME_FROM_LAST_OFFSET
will work as expected, please refer to queryConsumerOffset in
ConsumerManageProcessor.java.
It will return the min offset only when the topic is still quite
new(minOffset==0) and no large accumulation(checkInDiskByConsumeOffset=false),
this is to ensure that if a new queue is created, messages should still be
consumed in the newly created queue.
But indeed, this is a bit confusing, I suggest we respect strictly for case of
CONSUME_FROM_LAST_OFFSET and deal with newly created queue more precisely
{code}
if (offset >= 0) {
responseHeader.setOffset(offset);
response.setCode(ResponseCode.SUCCESS);
response.setRemark(null);
} else {
long minOffset =
this.brokerController.getMessageStore().getMinOffsetInQuque(requestHeader.getTopic(),
requestHeader.getQueueId());
if (minOffset <= 0
&&
!this.brokerController.getMessageStore().checkInDiskByConsumeOffset(
requestHeader.getTopic(), requestHeader.getQueueId(), 0)) {
responseHeader.setOffset(0L);
response.setCode(ResponseCode.SUCCESS);
response.setRemark(null);
} else {
response.setCode(ResponseCode.QUERY_NOT_FOUND);
response.setRemark("Not found, V3_0_6_SNAPSHOT maybe this group
consumer boot first");
}
}
{code}
> MQ client CONSUME_FROM_LAST_OFFSET dont work
> --------------------------------------------
>
> Key: ROCKETMQ-112
> URL: https://issues.apache.org/jira/browse/ROCKETMQ-112
> Project: Apache RocketMQ
> Issue Type: Bug
> Components: rocketmq-client
> Reporter: zhaoziyan
> Assignee: Xiaorui Wang
>
> case CONSUME_FROM_LAST_OFFSET: {
> long lastOffset = offsetStore.readOffset(mq,
> ReadOffsetType.READ_FROM_STORE);
> if (lastOffset >= 0) {
> result = lastOffset;
> }
> // First start,no offset
> else if (-1 == lastOffset) {
> if
> (mq.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
> result = 0L;
> }
> else {
> try {
> result =
> this.mQClientFactory.getMQAdminImpl().maxOffset(mq);
> }
> catch (MQClientException e) {
> result = -1;
> }
> }
> }
> else {
> result = -1;
> }
> break;
> }
> offsetStore.readOffset is minOffset not the maxOffset
> CONSUME_FROM_LAST_OFFSET dont work
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)