LvChenhui opened a new issue #2298:
URL: https://github.com/apache/rocketmq/issues/2298


   Hi all:
       I execute "reset offset command from the console paltform when consumer 
client is alive.But when I compare the number of messages,high data probability 
is less. So I tried to reproduce the problem and find the key.
       I find that when the client accepts a request from the Broker,it will 
suspend consumer then reset offset, finally resume consumer. but when suspend 
consumer,the other thread may already execute inner method to update 
offset.Here's some of the code I found for the impact.
   ```
   MQClientInstance#resetOffset
   public void resetOffset(String topic, String group, Map<MessageQueue, Long> 
offsetTable) {
             ....
   
               consumer.suspend();
   
              ...
             //find processQueue
             ...
   
             //Maybe this code is a temporary solution to this problem
               try {
                   TimeUnit.SECONDS.sleep(10);
               } catch (InterruptedException e) {
               }
   
             ...
             //reset offset and drop 
             ...
               
           } finally {
               if (consumer != null) {
                   consumer.resume();
               }
           }
       }
   ```
   DefaultMQPushConsumerImpl#pullMessage#PullCallback#onSuccess
   PullCallback pullCallback = new PullCallback() {
               @Override
               public void onSuccess(PullResult pullResult) {
                   if (pullResult != null) {
                       pullResult = 
DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(),
 pullResult,
                           subscriptionData);
   
                       switch (pullResult.getPullStatus()) {
                           case FOUND:
                              ...
   
                               break;
                           case NO_NEW_MSG:
                               
pullRequest.setNextOffset(pullResult.getNextBeginOffset());
                               // update offset
                               
DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest);
   
                               
DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
                               break;
                           case NO_MATCHED_MSG:
                               
pullRequest.setNextOffset(pullResult.getNextBeginOffset());
                              // update offset
                               
DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest);
   
                               
DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
                               break;
                           case OFFSET_ILLEGAL:
                              ....
   
                               break;
                           default:
                               break;
                       }
                   }
               }
   ```


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to