bxfjb opened a new issue, #10076:
URL: https://github.com/apache/rocketmq/issues/10076
### Before Creating the Enhancement Request
- [x] I have confirmed that this should be classified as an enhancement
rather than a bug/feature.
### Summary
```
public synchronized void resetOffset(String topic, String group,
Map<MessageQueue, Long> offsetTable) {
DefaultMQPushConsumerImpl consumer = null;
try {
...
ConcurrentMap<MessageQueue, ProcessQueue> processQueueTable =
consumer.getRebalanceImpl().getProcessQueueTable();
for (Map.Entry<MessageQueue, ProcessQueue> entry :
processQueueTable.entrySet()) {
MessageQueue mq = entry.getKey();
if (topic.equals(mq.getTopic()) &&
offsetTable.containsKey(mq)) {
ProcessQueue pq = entry.getValue();
pq.setDropped(true);
pq.clear();
}
}
try {
TimeUnit.SECONDS.sleep(10);
} catch (InterruptedException ignored) {
}
...
```
Consumer need to wait fixed 10 seconds after recieve broker's reset offset
request, which is neither elegant nor efficient. The current implementation
could cause badly consumption pause with multiple and dense request. We may
check each process queue's status with a max waiting time until they are all
clear.
### Motivation
To polish code implementation and avoid perhaps performance side effect.
### Describe the Solution You'd Like
Implement a neat polling for every process queue.
### Describe Alternatives You've Considered
More complicated logic like `CountDownLatch` may help too.
### Additional Context
_No response_
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]