This is an automated email from the ASF dual-hosted git repository.

duhengforever pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 0bd8bdf7f [ISSUE #4568]Improve performance of consume queue (#4569)
0bd8bdf7f is described below

commit 0bd8bdf7f91180d9a0a521b9954848841c41252e
Author: Zhanhui Li <[email protected]>
AuthorDate: Thu Aug 25 13:49:41 2022 +0800

    [ISSUE #4568]Improve performance of consume queue (#4569)
    
    * Two improvements are made. 1, Search from previous minLogicOffset if 
possible; 2, Binary search instead of linear scan
    
    * Add unit test cases, covering the changes
    
    * Optimize terminal condition
    
    * Adapt to latest code
---
 .../org/apache/rocketmq/store/ConsumeQueue.java    | 121 +++++++++++++++++++--
 .../apache/rocketmq/store/ConsumeQueueTest.java    |  48 ++++++++
 2 files changed, 160 insertions(+), 9 deletions(-)

diff --git a/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java 
b/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java
index d0686d916..10049d54f 100644
--- a/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java
+++ b/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java
@@ -57,6 +57,10 @@ public class ConsumeQueue implements ConsumeQueueInterface, 
FileQueueLifeCycle {
     private final String storePath;
     private final int mappedFileSize;
     private long maxPhysicOffset = -1;
+
+    /**
+     * Minimum offset of the consume file queue that points to valid commit 
log record.
+     */
     private volatile long minLogicOffset = 0;
     private ConsumeQueueExt consumeQueueExt = null;
 
@@ -382,26 +386,125 @@ public class ConsumeQueue implements 
ConsumeQueueInterface, FileQueueLifeCycle {
         return cnt;
     }
 
+    /**
+     * Update minLogicOffset such that entries after it would point to valid 
commit log address.
+     *
+     * @param minCommitLogOffset Minimum commit log offset
+     */
     @Override
-    public void correctMinOffset(long phyMinOffset) {
+    public void correctMinOffset(long minCommitLogOffset) {
+        // Check if the consume queue is the state of deprecation.
+        if (minLogicOffset >= mappedFileQueue.getMaxOffset()) {
+            log.info("ConsumeQueue[Topic={}, queue-id={}] contains no valid 
entries", topic, queueId);
+            return;
+        }
+
+        // Check whether the consume queue maps no valid data at all. This 
check may cost 1 IO operation.
+        // The rationale is that consume queue always preserves the last file. 
In case there are many deprecated topics,
+        // This check would save a lot of efforts.
+        MappedFile lastMappedFile = this.mappedFileQueue.getLastMappedFile();
+        if (null == lastMappedFile) {
+            return;
+        }
+
+        SelectMappedBufferResult lastRecord = null;
+        try {
+            int maxReadablePosition = lastMappedFile.getReadPosition();
+            lastRecord = lastMappedFile.selectMappedBuffer(maxReadablePosition 
- ConsumeQueue.CQ_STORE_UNIT_SIZE,
+                ConsumeQueue.CQ_STORE_UNIT_SIZE);
+            if (null != lastRecord) {
+                ByteBuffer buffer = lastRecord.getByteBuffer();
+                long commitLogOffset = buffer.getLong();
+                if (commitLogOffset < minCommitLogOffset) {
+                    // Keep the largest known consume offset, even if this 
consume-queue contains no valid entries at
+                    // all. Let minLogicOffset point to a future slot.
+                    this.minLogicOffset = lastMappedFile.getFileFromOffset() + 
maxReadablePosition;
+                    log.info("ConsumeQueue[topic={}, queue-id={}] contains no 
valid entries. Min-offset is assigned as: {}.",
+                        topic, queueId, getMinOffsetInQueue());
+                    return;
+                }
+            }
+        } finally {
+            if (null != lastRecord) {
+                lastRecord.release();
+            }
+        }
+
         MappedFile mappedFile = this.mappedFileQueue.getFirstMappedFile();
         long minExtAddr = 1;
         if (mappedFile != null) {
-            SelectMappedBufferResult result = mappedFile.selectMappedBuffer(0);
+            // Search from previous min logical offset. Typically, a consume 
queue file segment contains 300,000 entries
+            // searching from previous position saves significant amount of 
comparisons and IOs
+            boolean intact = true; // Assume previous value is still valid
+            long start = this.minLogicOffset - mappedFile.getFileFromOffset();
+            if (start < 0) {
+                intact = false;
+                start = 0;
+            }
+
+            if (start > mappedFile.getReadPosition()) {
+                log.error("[Bug][InconsistentState] ConsumeQueue file {} 
should have been deleted",
+                    mappedFile.getFileName());
+                return;
+            }
+
+            SelectMappedBufferResult result = 
mappedFile.selectMappedBuffer((int) start);
             if (result == null) {
+                log.warn("[Bug] Failed to scan consume queue entries from file 
on correcting min offset: {}",
+                    mappedFile.getFileName());
                 return;
             }
+
             try {
-                for (int i = 0; i < result.getSize(); i += 
ConsumeQueue.CQ_STORE_UNIT_SIZE) {
-                    long offsetPy = result.getByteBuffer().getLong();
-                    result.getByteBuffer().getInt();
-                    long tagsCode = result.getByteBuffer().getLong();
+                // No valid consume entries
+                if (result.getSize() == 0) {
+                    log.debug("ConsumeQueue[topic={}, queue-id={}] contains no 
valid entries", topic, queueId);
+                    return;
+                }
+
+                ByteBuffer buffer = result.getByteBuffer().slice();
+                // Verify whether the previous value is still valid or not 
before conducting binary search
+                long commitLogOffset = buffer.getLong();
+                if (intact && commitLogOffset >= minCommitLogOffset) {
+                    log.info("Abort correction as previous min-offset points 
to {}, which is greater than {}",
+                        commitLogOffset, minCommitLogOffset);
+                    return;
+                }
+
+                // Binary search between range [previous_min_logic_offset, 
first_file_from_offset + file_size)
+                // Note the consume-queue deletion procedure ensures the last 
entry points to somewhere valid.
+                int low = 0;
+                int high = result.getSize() - ConsumeQueue.CQ_STORE_UNIT_SIZE;
+                while (true) {
+                    if (high - low <= ConsumeQueue.CQ_STORE_UNIT_SIZE) {
+                        break;
+                    }
+                    int mid = (low + high) / 2 / 
ConsumeQueue.CQ_STORE_UNIT_SIZE * ConsumeQueue.CQ_STORE_UNIT_SIZE;
+                    buffer.position(mid);
+                    commitLogOffset = buffer.getLong();
+                    if (commitLogOffset > minCommitLogOffset) {
+                        high = mid;
+                    } else if (commitLogOffset == minCommitLogOffset) {
+                        low = mid;
+                        high = mid;
+                        break;
+                    } else {
+                        low = mid;
+                    }
+                }
+
+                // Examine the last one or two entries
+                for (int i = low; i <= high; i += 
ConsumeQueue.CQ_STORE_UNIT_SIZE) {
+                    buffer.position(i);
+                    long offsetPy = buffer.getLong();
+                    buffer.position(i + 12);
+                    long tagsCode = buffer.getLong();
 
-                    if (offsetPy >= phyMinOffset) {
-                        this.minLogicOffset = mappedFile.getFileFromOffset() + 
i;
+                    if (offsetPy >= minCommitLogOffset) {
+                        this.minLogicOffset = mappedFile.getFileFromOffset() + 
start + i;
                         log.info("Compute logical min offset: {}, topic: {}, 
queueId: {}",
                             this.getMinOffsetInQueue(), this.topic, 
this.queueId);
-                        // This maybe not take effect, when not every consume 
queue has extend file.
+                        // This maybe not take effect, when not every consume 
queue has an extended file.
                         if (isExtAddr(tagsCode)) {
                             minExtAddr = tagsCode;
                         }
diff --git 
a/store/src/test/java/org/apache/rocketmq/store/ConsumeQueueTest.java 
b/store/src/test/java/org/apache/rocketmq/store/ConsumeQueueTest.java
index 6bbe40dae..9cd2e5702 100644
--- a/store/src/test/java/org/apache/rocketmq/store/ConsumeQueueTest.java
+++ b/store/src/test/java/org/apache/rocketmq/store/ConsumeQueueTest.java
@@ -18,6 +18,7 @@
 package org.apache.rocketmq.store;
 
 import java.io.File;
+import java.io.IOException;
 import java.lang.reflect.Method;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
@@ -36,6 +37,7 @@ import org.apache.rocketmq.store.queue.ReferredIterator;
 import org.apache.rocketmq.store.stats.BrokerStatsManager;
 import org.junit.Assert;
 import org.junit.Test;
+import org.mockito.Mockito;
 
 import static org.assertj.core.api.Assertions.assertThat;
 
@@ -429,4 +431,50 @@ public class ConsumeQueueTest {
             UtilAll.deleteFile(new File(storePath));
         }
     }
+
+    @Test
+    public void testCorrectMinOffset() {
+        String topic = "T1";
+        int queueId = 0;
+        MessageStoreConfig storeConfig = new MessageStoreConfig();
+        File tmpDir = new File(System.getProperty("java.io.tmpdir"), 
"test_correct_min_offset");
+        tmpDir.deleteOnExit();
+        storeConfig.setStorePathRootDir(tmpDir.getAbsolutePath());
+        storeConfig.setEnableConsumeQueueExt(false);
+        DefaultMessageStore messageStore = 
Mockito.mock(DefaultMessageStore.class);
+        
Mockito.when(messageStore.getMessageStoreConfig()).thenReturn(storeConfig);
+
+        RunningFlags runningFlags = new RunningFlags();
+        Mockito.when(messageStore.getRunningFlags()).thenReturn(runningFlags);
+
+        StoreCheckpoint storeCheckpoint = Mockito.mock(StoreCheckpoint.class);
+        
Mockito.when(messageStore.getStoreCheckpoint()).thenReturn(storeCheckpoint);
+
+        ConsumeQueue consumeQueue = new ConsumeQueue(topic, queueId, 
storeConfig.getStorePathRootDir(),
+            storeConfig.getMappedFileSizeConsumeQueue(), messageStore);
+
+        int max = 10000;
+        int message_size = 100;
+        for (int i = 0; i < max; ++i) {
+            DispatchRequest dispatchRequest = new DispatchRequest(topic, 
queueId, message_size * i, message_size, 0, 0, i, null, null, 0, 0, null);
+            consumeQueue.putMessagePositionInfoWrapper(dispatchRequest);
+        }
+
+        consumeQueue.setMinLogicOffset(0L);
+        consumeQueue.correctMinOffset(0L);
+        Assert.assertEquals(0, consumeQueue.getMinOffsetInQueue());
+
+        consumeQueue.setMinLogicOffset(100);
+        consumeQueue.correctMinOffset(2000);
+        Assert.assertEquals(20, consumeQueue.getMinOffsetInQueue());
+
+        consumeQueue.setMinLogicOffset((max - 1) * 
ConsumeQueue.CQ_STORE_UNIT_SIZE);
+        consumeQueue.correctMinOffset(max * message_size);
+        Assert.assertEquals(max * ConsumeQueue.CQ_STORE_UNIT_SIZE, 
consumeQueue.getMinLogicOffset());
+
+        consumeQueue.setMinLogicOffset(max * ConsumeQueue.CQ_STORE_UNIT_SIZE);
+        consumeQueue.correctMinOffset(max * message_size);
+        Assert.assertEquals(max * ConsumeQueue.CQ_STORE_UNIT_SIZE, 
consumeQueue.getMinLogicOffset());
+        consumeQueue.destroy();
+    }
 }

Reply via email to