This is an automated email from the ASF dual-hosted git repository.
lizhimin pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new ebf1595418 [ISSUE #10103] Improve batch polling efficiency in
pollIndexRecord method (#10104)
ebf1595418 is described below
commit ebf1595418178d5c73ba80ab08fb3a7312efc25e
Author: yx9o <[email protected]>
AuthorDate: Tue Mar 17 16:22:28 2026 +0800
[ISSUE #10103] Improve batch polling efficiency in pollIndexRecord method
(#10104)
---
.../store/index/rocksdb/IndexRocksDBStore.java | 22 +++++++++++-----------
1 file changed, 11 insertions(+), 11 deletions(-)
diff --git
a/store/src/main/java/org/apache/rocketmq/store/index/rocksdb/IndexRocksDBStore.java
b/store/src/main/java/org/apache/rocketmq/store/index/rocksdb/IndexRocksDBStore.java
index 202cf542b0..38303bf504 100644
---
a/store/src/main/java/org/apache/rocketmq/store/index/rocksdb/IndexRocksDBStore.java
+++
b/store/src/main/java/org/apache/rocketmq/store/index/rocksdb/IndexRocksDBStore.java
@@ -321,18 +321,18 @@ public class IndexRocksDBStore implements
CommitLogDispatchStore {
private void pollIndexRecord() {
try {
IndexRocksDBRecord firstReq = originIndexMsgQueue.poll(100,
TimeUnit.MILLISECONDS);
- if (null != firstReq) {
- irs.add(firstReq);
- while (true) {
- IndexRocksDBRecord tmpReq =
originIndexMsgQueue.poll(100, TimeUnit.MILLISECONDS);
- if (null == tmpReq) {
- break;
- }
- irs.add(tmpReq);
- if (irs.size() >= BATCH_SIZE) {
- break;
- }
+ if (firstReq == null) {
+ return;
+ }
+ irs.add(firstReq);
+ originIndexMsgQueue.drainTo(irs, BATCH_SIZE - irs.size());
+ while (irs.size() < BATCH_SIZE) {
+ IndexRocksDBRecord tmpReq = originIndexMsgQueue.poll(100,
TimeUnit.MILLISECONDS);
+ if (tmpReq == null) {
+ break;
}
+ irs.add(tmpReq);
+ originIndexMsgQueue.drainTo(irs, BATCH_SIZE - irs.size());
}
} catch (Exception e) {
logError.error("IndexRocksDBStore IndexBuildService error:
{}", e.getMessage());