This is an automated email from the ASF dual-hosted git repository.
duhengforever pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 0bd8bdf7f [ISSUE #4568]Improve performance of consume queue (#4569)
0bd8bdf7f is described below
commit 0bd8bdf7f91180d9a0a521b9954848841c41252e
Author: Zhanhui Li <[email protected]>
AuthorDate: Thu Aug 25 13:49:41 2022 +0800
[ISSUE #4568]Improve performance of consume queue (#4569)
* Two improvements are made. 1, Search from previous minLogicOffset if
possible; 2, Binary search instead of linear scan
* Add unit test cases, covering the changes
* Optimize terminal condition
* Adapt to latest code
---
.../org/apache/rocketmq/store/ConsumeQueue.java | 121 +++++++++++++++++++--
.../apache/rocketmq/store/ConsumeQueueTest.java | 48 ++++++++
2 files changed, 160 insertions(+), 9 deletions(-)
diff --git a/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java
b/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java
index d0686d916..10049d54f 100644
--- a/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java
+++ b/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java
@@ -57,6 +57,10 @@ public class ConsumeQueue implements ConsumeQueueInterface,
FileQueueLifeCycle {
private final String storePath;
private final int mappedFileSize;
private long maxPhysicOffset = -1;
+
+ /**
+ * Minimum offset of the consume file queue that points to valid commit
log record.
+ */
private volatile long minLogicOffset = 0;
private ConsumeQueueExt consumeQueueExt = null;
@@ -382,26 +386,125 @@ public class ConsumeQueue implements
ConsumeQueueInterface, FileQueueLifeCycle {
return cnt;
}
+ /**
+ * Update minLogicOffset such that entries after it would point to valid
commit log address.
+ *
+ * @param minCommitLogOffset Minimum commit log offset
+ */
@Override
- public void correctMinOffset(long phyMinOffset) {
+ public void correctMinOffset(long minCommitLogOffset) {
+ // Check if the consume queue is the state of deprecation.
+ if (minLogicOffset >= mappedFileQueue.getMaxOffset()) {
+ log.info("ConsumeQueue[Topic={}, queue-id={}] contains no valid
entries", topic, queueId);
+ return;
+ }
+
+ // Check whether the consume queue maps no valid data at all. This
check may cost 1 IO operation.
+ // The rationale is that consume queue always preserves the last file.
In case there are many deprecated topics,
+ // This check would save a lot of efforts.
+ MappedFile lastMappedFile = this.mappedFileQueue.getLastMappedFile();
+ if (null == lastMappedFile) {
+ return;
+ }
+
+ SelectMappedBufferResult lastRecord = null;
+ try {
+ int maxReadablePosition = lastMappedFile.getReadPosition();
+ lastRecord = lastMappedFile.selectMappedBuffer(maxReadablePosition
- ConsumeQueue.CQ_STORE_UNIT_SIZE,
+ ConsumeQueue.CQ_STORE_UNIT_SIZE);
+ if (null != lastRecord) {
+ ByteBuffer buffer = lastRecord.getByteBuffer();
+ long commitLogOffset = buffer.getLong();
+ if (commitLogOffset < minCommitLogOffset) {
+ // Keep the largest known consume offset, even if this
consume-queue contains no valid entries at
+ // all. Let minLogicOffset point to a future slot.
+ this.minLogicOffset = lastMappedFile.getFileFromOffset() +
maxReadablePosition;
+ log.info("ConsumeQueue[topic={}, queue-id={}] contains no
valid entries. Min-offset is assigned as: {}.",
+ topic, queueId, getMinOffsetInQueue());
+ return;
+ }
+ }
+ } finally {
+ if (null != lastRecord) {
+ lastRecord.release();
+ }
+ }
+
MappedFile mappedFile = this.mappedFileQueue.getFirstMappedFile();
long minExtAddr = 1;
if (mappedFile != null) {
- SelectMappedBufferResult result = mappedFile.selectMappedBuffer(0);
+ // Search from previous min logical offset. Typically, a consume
queue file segment contains 300,000 entries
+ // searching from previous position saves significant amount of
comparisons and IOs
+ boolean intact = true; // Assume previous value is still valid
+ long start = this.minLogicOffset - mappedFile.getFileFromOffset();
+ if (start < 0) {
+ intact = false;
+ start = 0;
+ }
+
+ if (start > mappedFile.getReadPosition()) {
+ log.error("[Bug][InconsistentState] ConsumeQueue file {}
should have been deleted",
+ mappedFile.getFileName());
+ return;
+ }
+
+ SelectMappedBufferResult result =
mappedFile.selectMappedBuffer((int) start);
if (result == null) {
+ log.warn("[Bug] Failed to scan consume queue entries from file
on correcting min offset: {}",
+ mappedFile.getFileName());
return;
}
+
try {
- for (int i = 0; i < result.getSize(); i +=
ConsumeQueue.CQ_STORE_UNIT_SIZE) {
- long offsetPy = result.getByteBuffer().getLong();
- result.getByteBuffer().getInt();
- long tagsCode = result.getByteBuffer().getLong();
+ // No valid consume entries
+ if (result.getSize() == 0) {
+ log.debug("ConsumeQueue[topic={}, queue-id={}] contains no
valid entries", topic, queueId);
+ return;
+ }
+
+ ByteBuffer buffer = result.getByteBuffer().slice();
+ // Verify whether the previous value is still valid or not
before conducting binary search
+ long commitLogOffset = buffer.getLong();
+ if (intact && commitLogOffset >= minCommitLogOffset) {
+ log.info("Abort correction as previous min-offset points
to {}, which is greater than {}",
+ commitLogOffset, minCommitLogOffset);
+ return;
+ }
+
+ // Binary search between range [previous_min_logic_offset,
first_file_from_offset + file_size)
+ // Note the consume-queue deletion procedure ensures the last
entry points to somewhere valid.
+ int low = 0;
+ int high = result.getSize() - ConsumeQueue.CQ_STORE_UNIT_SIZE;
+ while (true) {
+ if (high - low <= ConsumeQueue.CQ_STORE_UNIT_SIZE) {
+ break;
+ }
+ int mid = (low + high) / 2 /
ConsumeQueue.CQ_STORE_UNIT_SIZE * ConsumeQueue.CQ_STORE_UNIT_SIZE;
+ buffer.position(mid);
+ commitLogOffset = buffer.getLong();
+ if (commitLogOffset > minCommitLogOffset) {
+ high = mid;
+ } else if (commitLogOffset == minCommitLogOffset) {
+ low = mid;
+ high = mid;
+ break;
+ } else {
+ low = mid;
+ }
+ }
+
+ // Examine the last one or two entries
+ for (int i = low; i <= high; i +=
ConsumeQueue.CQ_STORE_UNIT_SIZE) {
+ buffer.position(i);
+ long offsetPy = buffer.getLong();
+ buffer.position(i + 12);
+ long tagsCode = buffer.getLong();
- if (offsetPy >= phyMinOffset) {
- this.minLogicOffset = mappedFile.getFileFromOffset() +
i;
+ if (offsetPy >= minCommitLogOffset) {
+ this.minLogicOffset = mappedFile.getFileFromOffset() +
start + i;
log.info("Compute logical min offset: {}, topic: {},
queueId: {}",
this.getMinOffsetInQueue(), this.topic,
this.queueId);
- // This maybe not take effect, when not every consume
queue has extend file.
+ // This maybe not take effect, when not every consume
queue has an extended file.
if (isExtAddr(tagsCode)) {
minExtAddr = tagsCode;
}
diff --git
a/store/src/test/java/org/apache/rocketmq/store/ConsumeQueueTest.java
b/store/src/test/java/org/apache/rocketmq/store/ConsumeQueueTest.java
index 6bbe40dae..9cd2e5702 100644
--- a/store/src/test/java/org/apache/rocketmq/store/ConsumeQueueTest.java
+++ b/store/src/test/java/org/apache/rocketmq/store/ConsumeQueueTest.java
@@ -18,6 +18,7 @@
package org.apache.rocketmq.store;
import java.io.File;
+import java.io.IOException;
import java.lang.reflect.Method;
import java.net.InetAddress;
import java.net.InetSocketAddress;
@@ -36,6 +37,7 @@ import org.apache.rocketmq.store.queue.ReferredIterator;
import org.apache.rocketmq.store.stats.BrokerStatsManager;
import org.junit.Assert;
import org.junit.Test;
+import org.mockito.Mockito;
import static org.assertj.core.api.Assertions.assertThat;
@@ -429,4 +431,50 @@ public class ConsumeQueueTest {
UtilAll.deleteFile(new File(storePath));
}
}
+
+ @Test
+ public void testCorrectMinOffset() {
+ String topic = "T1";
+ int queueId = 0;
+ MessageStoreConfig storeConfig = new MessageStoreConfig();
+ File tmpDir = new File(System.getProperty("java.io.tmpdir"),
"test_correct_min_offset");
+ tmpDir.deleteOnExit();
+ storeConfig.setStorePathRootDir(tmpDir.getAbsolutePath());
+ storeConfig.setEnableConsumeQueueExt(false);
+ DefaultMessageStore messageStore =
Mockito.mock(DefaultMessageStore.class);
+
Mockito.when(messageStore.getMessageStoreConfig()).thenReturn(storeConfig);
+
+ RunningFlags runningFlags = new RunningFlags();
+ Mockito.when(messageStore.getRunningFlags()).thenReturn(runningFlags);
+
+ StoreCheckpoint storeCheckpoint = Mockito.mock(StoreCheckpoint.class);
+
Mockito.when(messageStore.getStoreCheckpoint()).thenReturn(storeCheckpoint);
+
+ ConsumeQueue consumeQueue = new ConsumeQueue(topic, queueId,
storeConfig.getStorePathRootDir(),
+ storeConfig.getMappedFileSizeConsumeQueue(), messageStore);
+
+ int max = 10000;
+ int message_size = 100;
+ for (int i = 0; i < max; ++i) {
+ DispatchRequest dispatchRequest = new DispatchRequest(topic,
queueId, message_size * i, message_size, 0, 0, i, null, null, 0, 0, null);
+ consumeQueue.putMessagePositionInfoWrapper(dispatchRequest);
+ }
+
+ consumeQueue.setMinLogicOffset(0L);
+ consumeQueue.correctMinOffset(0L);
+ Assert.assertEquals(0, consumeQueue.getMinOffsetInQueue());
+
+ consumeQueue.setMinLogicOffset(100);
+ consumeQueue.correctMinOffset(2000);
+ Assert.assertEquals(20, consumeQueue.getMinOffsetInQueue());
+
+ consumeQueue.setMinLogicOffset((max - 1) *
ConsumeQueue.CQ_STORE_UNIT_SIZE);
+ consumeQueue.correctMinOffset(max * message_size);
+ Assert.assertEquals(max * ConsumeQueue.CQ_STORE_UNIT_SIZE,
consumeQueue.getMinLogicOffset());
+
+ consumeQueue.setMinLogicOffset(max * ConsumeQueue.CQ_STORE_UNIT_SIZE);
+ consumeQueue.correctMinOffset(max * message_size);
+ Assert.assertEquals(max * ConsumeQueue.CQ_STORE_UNIT_SIZE,
consumeQueue.getMinLogicOffset());
+ consumeQueue.destroy();
+ }
}