chenyx7 opened a new issue #2449: URL: https://github.com/apache/rocketmq/issues/2449
**BUG REPORT** 1. invoke the admin method resetOffsetByTimestamp more than once in 10s 2. result: the statistics of GROUP_GET_NUMS is two times or multi times of the TOPIC_PUT_NUMS in one consumerGroup   3. although consuming repeatably is normal for rmq, I guess this is not the normal kind of repeat 4. reason: the method (org.apache.rocketmq.client.impl.factory.MQClientInstance#resetOffset) is not thread safe , will cause more than one PullRequest for the same queue in pullRequestQueue , and which will cause consume repeatably and continuously until the consumer restart. 5. analyze: public void resetOffset(String topic, String group, Map<MessageQueue, Long> offsetTable) { consumer.suspend(); ConcurrentMap<MessageQueue, ProcessQueue> processQueueTable = consumer.getRebalanceImpl().getProcessQueueTable(); for (Map.Entry<MessageQueue, ProcessQueue> entry : processQueueTable.entrySet()) { MessageQueue mq = entry.getKey(); if (topic.equals(mq.getTopic()) && offsetTable.containsKey(mq)) { ProcessQueue pq = entry.getValue(); // if thread0 come here at 5s, and thread1 come here at 10s, they will drop the same processQueue pq.setDropped(true); pq.clear(); } } try { //both thread0 and thread1 is blocked here TimeUnit.SECONDS.sleep(10); } catch (InterruptedException e) { } Iterator<MessageQueue> iterator = processQueueTable.keySet().iterator(); while (iterator.hasNext()) { MessageQueue mq = iterator.next(); Long offset = offsetTable.get(mq); if (topic.equals(mq.getTopic()) && offset != null) { try { consumer.updateConsumeOffset(mq, offset); consumer.getRebalanceImpl().removeUnnecessaryMessageQueue(mq, processQueueTable.get(mq)); // here, the thread1 will remove the procesQueue created by thread0, however the processQueue is not droped // hence , the pullRequest created by thread0 will not exit. iterator.remove(); } catch (Exception e) { log.warn("reset offset failed. group={}, {}", group, mq, e); } } } } finally { if (consumer != null) { consumer.resume(); } } } ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org