chenyx7 opened a new issue #2449:
URL: https://github.com/apache/rocketmq/issues/2449


   
   **BUG REPORT**
   
   1.  invoke  the admin method resetOffsetByTimestamp more than once in 10s 
   
   2. result: the statistics of GROUP_GET_NUMS is two times or multi times  of 
the TOPIC_PUT_NUMS in one consumerGroup
   
![image](https://user-images.githubusercontent.com/46484030/99966659-9966d480-2dd1-11eb-8428-1e1bf0637978.png)
   
![image](https://user-images.githubusercontent.com/46484030/99966883-f5c9f400-2dd1-11eb-9d35-fdc3794aed00.png)
   
   3. although consuming repeatably is normal for rmq, I guess this is not the 
normal kind of repeat
   
   4. reason: the method 
(org.apache.rocketmq.client.impl.factory.MQClientInstance#resetOffset) is not 
thread safe , will cause more than one PullRequest for the same queue in 
pullRequestQueue , and which will cause consume repeatably and continuously 
until the consumer restart.
   
   5. analyze: 
   
    public void resetOffset(String topic, String group, Map<MessageQueue, Long> 
offsetTable) {
        
               consumer.suspend();
   
               ConcurrentMap<MessageQueue, ProcessQueue> processQueueTable = 
consumer.getRebalanceImpl().getProcessQueueTable();
               for (Map.Entry<MessageQueue, ProcessQueue> entry : 
processQueueTable.entrySet()) {
                   MessageQueue mq = entry.getKey();
                   if (topic.equals(mq.getTopic()) && 
offsetTable.containsKey(mq)) {
                       ProcessQueue pq = entry.getValue();
   
                       // if thread0 come here at 5s, and thread1 come here at 
10s, they will drop the same processQueue
                       pq.setDropped(true);
                       pq.clear();
                   }
               }
   
               try {
                  //both thread0 and thread1 is blocked here 
                   TimeUnit.SECONDS.sleep(10);
               } catch (InterruptedException e) {
               }
   
   
               Iterator<MessageQueue> iterator = 
processQueueTable.keySet().iterator();
               while (iterator.hasNext()) {
                   MessageQueue mq = iterator.next();
                   Long offset = offsetTable.get(mq);
                   if (topic.equals(mq.getTopic()) && offset != null) {
                       try {
                           consumer.updateConsumeOffset(mq, offset);
                           
consumer.getRebalanceImpl().removeUnnecessaryMessageQueue(mq, 
processQueueTable.get(mq));
   
                         // here, the thread1 will remove the procesQueue 
created by thread0, however the processQueue is not droped
                         // hence , the pullRequest created by thread0 will not 
exit.
                           iterator.remove();
                       } catch (Exception e) {
                           log.warn("reset offset failed. group={}, {}", group, 
mq, e);
                       }
                   }
               }
           } finally {
               if (consumer != null) {
                   consumer.resume();
               }
           }
       }


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to