This is an automated email from the ASF dual-hosted git repository.

lollipop pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 3949f6938 [ISSUE #5837] Deprecate 
MessageStore#checkInDiskByConsumeOffset (#5840)
3949f6938 is described below

commit 3949f6938e136f35e5e929588ef465ab74e82008
Author: SSpirits <[email protected]>
AuthorDate: Mon Jan 9 16:47:42 2023 +0800

    [ISSUE #5837] Deprecate MessageStore#checkInDiskByConsumeOffset (#5840)
    
    * deprecate MessageStore#checkInDiskByConsumeOffset and add 
checkInMemByConsumeOffset and checkInStoreByConsumeOffset in MessageStore
    
    * skip initializing isLoaded0 method in windows
---
 .../broker/offset/BroadcastOffsetManager.java      |  2 +-
 .../broker/processor/ConsumerManageProcessor.java  |  4 +-
 .../apache/rocketmq/store/DefaultMessageStore.java | 77 +++++++++++++++++-----
 .../org/apache/rocketmq/store/MessageStore.java    | 32 +++++++--
 .../rocketmq/store/SelectMappedBufferResult.java   | 11 +++-
 .../rocketmq/store/logfile/DefaultMappedFile.java  | 55 ++++++++++++++++
 .../apache/rocketmq/store/logfile/MappedFile.java  |  8 +++
 .../store/plugin/AbstractPluginMessageStore.java   | 11 ++++
 .../store/queue/BatchConsumeMessageTest.java       |  4 +-
 9 files changed, 175 insertions(+), 29 deletions(-)

diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/offset/BroadcastOffsetManager.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/offset/BroadcastOffsetManager.java
index 16e70eed2..9896735dd 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/offset/BroadcastOffsetManager.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/offset/BroadcastOffsetManager.java
@@ -116,7 +116,7 @@ public class BroadcastOffsetManager extends ServiceThread {
                 
brokerController.getConsumerOffsetManager().queryOffset(broadcastGroupId(groupId),
 topic, queueId);
         }
         if (storeOffset < 0) {
-            if 
(!this.brokerController.getMessageStore().checkInDiskByConsumeOffset(topic, 
queueId, 0)) {
+            if 
(this.brokerController.getMessageStore().checkInMemByConsumeOffset(topic, 
queueId, 0, 1)) {
                 storeOffset = 0;
             } else {
                 storeOffset = 
brokerController.getMessageStore().getMaxOffsetInQueue(topic, queueId, true);
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/ConsumerManageProcessor.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/ConsumerManageProcessor.java
index 395102c7e..9c6d28d3d 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/ConsumerManageProcessor.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/ConsumerManageProcessor.java
@@ -328,8 +328,8 @@ public class ConsumerManageProcessor implements 
NettyRequestProcessor {
                 response.setCode(ResponseCode.QUERY_NOT_FOUND);
                 response.setRemark("Not found, do not set to zero, maybe this 
group boot first");
             } else if (minOffset <= 0
-                && 
!this.brokerController.getMessageStore().checkInDiskByConsumeOffset(
-                requestHeader.getTopic(), requestHeader.getQueueId(), 0)) {
+                && 
this.brokerController.getMessageStore().checkInMemByConsumeOffset(
+                requestHeader.getTopic(), requestHeader.getQueueId(), 0, 1)) {
                 responseHeader.setOffset(0L);
                 response.setCode(ResponseCode.SUCCESS);
                 response.setRemark(null);
diff --git 
a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java 
b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
index 93d006245..b52982dc4 100644
--- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
@@ -795,13 +795,13 @@ public class DefaultMessageStore implements MessageStore {
                             long offsetPy = cqUnit.getPos();
                             int sizePy = cqUnit.getSize();
 
-                            boolean isInDisk = 
checkInDiskByCommitOffset(offsetPy, maxOffsetPy);
+                            boolean isInMem = 
estimateInMemByCommitOffset(offsetPy, maxOffsetPy);
 
                             if ((cqUnit.getQueueOffset() - offset) * 
consumeQueue.getUnitSize() > maxFilterMessageSize) {
                                 break;
                             }
 
-                            if (this.isTheBatchFull(sizePy, 
cqUnit.getBatchNum(), maxMsgNums, maxPullSize, getResult.getBufferTotalSize(), 
getResult.getMessageCount(), isInDisk)) {
+                            if (this.isTheBatchFull(sizePy, 
cqUnit.getBatchNum(), maxMsgNums, maxPullSize, getResult.getBufferTotalSize(), 
getResult.getMessageCount(), isInMem)) {
                                 break;
                             }
 
@@ -1418,6 +1418,7 @@ public class DefaultMessageStore implements MessageStore {
     }
 
     @Override
+    @Deprecated
     public boolean checkInDiskByConsumeOffset(final String topic, final int 
queueId, long consumeOffset) {
 
         final long maxOffsetPy = this.commitLog.getMaxOffset();
@@ -1428,7 +1429,7 @@ public class DefaultMessageStore implements MessageStore {
 
             if (cqUnit != null) {
                 long offsetPy = cqUnit.getPos();
-                return checkInDiskByCommitOffset(offsetPy, maxOffsetPy);
+                return !estimateInMemByCommitOffset(offsetPy, maxOffsetPy);
             } else {
                 return false;
             }
@@ -1436,6 +1437,38 @@ public class DefaultMessageStore implements MessageStore 
{
         return false;
     }
 
+    @Override
+    public boolean checkInMemByConsumeOffset(final String topic, final int 
queueId, long consumeOffset, int batchSize) {
+        ConsumeQueueInterface consumeQueue = findConsumeQueue(topic, queueId);
+        if (consumeQueue != null) {
+            CqUnit firstCQItem = consumeQueue.get(consumeOffset);
+            if (firstCQItem == null) {
+                return false;
+            }
+            long startOffsetPy = firstCQItem.getPos();
+            if (batchSize <= 1) {
+                int size = firstCQItem.getSize();
+                return checkInMemByCommitOffset(startOffsetPy, size);
+            }
+
+            CqUnit lastCQItem = consumeQueue.get(consumeOffset + batchSize);
+            if (lastCQItem == null) {
+                int size = firstCQItem.getSize();
+                return checkInMemByCommitOffset(startOffsetPy, size);
+            }
+            long endOffsetPy = lastCQItem.getPos();
+            int size = (int) (endOffsetPy - startOffsetPy) + 
lastCQItem.getSize();
+            return checkInMemByCommitOffset(startOffsetPy, size);
+        }
+        return false;
+    }
+
+    @Override
+    public boolean checkInStoreByConsumeOffset(String topic, int queueId, long 
consumeOffset) {
+        long commitLogOffset = getCommitLogOffsetInQueue(topic, queueId, 
consumeOffset);
+        return checkInDiskByCommitOffset(commitLogOffset);
+    }
+
     @Override
     public long dispatchBehindBytes() {
         return this.reputMessageService.behind();
@@ -1600,13 +1633,29 @@ public class DefaultMessageStore implements 
MessageStore {
         return nextOffset;
     }
 
-    private boolean checkInDiskByCommitOffset(long offsetPy, long maxOffsetPy) 
{
+    private boolean estimateInMemByCommitOffset(long offsetPy, long 
maxOffsetPy) {
         long memory = (long) (StoreUtil.TOTAL_PHYSICAL_MEMORY_SIZE * 
(this.messageStoreConfig.getAccessMessageInMemoryMaxRatio() / 100.0));
-        return (maxOffsetPy - offsetPy) > memory;
+        return (maxOffsetPy - offsetPy) <= memory;
+    }
+
+    private boolean checkInMemByCommitOffset(long offsetPy, int size) {
+        SelectMappedBufferResult message = this.commitLog.getMessage(offsetPy, 
size);
+        if (message != null) {
+            try {
+                return message.isInMem();
+            } finally {
+                message.release();
+            }
+        }
+        return false;
+    }
+
+    public boolean checkInDiskByCommitOffset(long offsetPy) {
+        return offsetPy >= commitLog.getMinOffset();
     }
 
     private boolean isTheBatchFull(int sizePy, int unitBatchNum, int 
maxMsgNums, long maxMsgSize, int bufferTotal,
-        int messageTotal, boolean isInDisk) {
+        int messageTotal, boolean isInMem) {
 
         if (0 == bufferTotal || 0 == messageTotal) {
             return false;
@@ -1620,25 +1669,19 @@ public class DefaultMessageStore implements 
MessageStore {
             return true;
         }
 
-        if (isInDisk) {
-            if ((bufferTotal + sizePy) > 
this.messageStoreConfig.getMaxTransferBytesOnMessageInDisk()) {
+        if (isInMem) {
+            if ((bufferTotal + sizePy) > 
this.messageStoreConfig.getMaxTransferBytesOnMessageInMemory()) {
                 return true;
             }
 
-            if (messageTotal > 
this.messageStoreConfig.getMaxTransferCountOnMessageInDisk() - 1) {
-                return true;
-            }
+            return messageTotal > 
this.messageStoreConfig.getMaxTransferCountOnMessageInMemory() - 1;
         } else {
-            if ((bufferTotal + sizePy) > 
this.messageStoreConfig.getMaxTransferBytesOnMessageInMemory()) {
+            if ((bufferTotal + sizePy) > 
this.messageStoreConfig.getMaxTransferBytesOnMessageInDisk()) {
                 return true;
             }
 
-            if (messageTotal > 
this.messageStoreConfig.getMaxTransferCountOnMessageInMemory() - 1) {
-                return true;
-            }
+            return messageTotal > 
this.messageStoreConfig.getMaxTransferCountOnMessageInDisk() - 1;
         }
-
-        return false;
     }
 
     private void deleteFile(final String fileName) {
diff --git a/store/src/main/java/org/apache/rocketmq/store/MessageStore.java 
b/store/src/main/java/org/apache/rocketmq/store/MessageStore.java
index bbf2056cc..8e86f8abe 100644
--- a/store/src/main/java/org/apache/rocketmq/store/MessageStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/MessageStore.java
@@ -129,7 +129,7 @@ public interface MessageStore {
 
     /**
      * Asynchronous get message
-     * @see org.apache.rocketmq.store.MessageStore#getMessage(String, String, 
int, long, int, MessageFilter) getMessage
+     * @see #getMessage(String, String, int, long, int, MessageFilter) 
getMessage
      *
      * @param group Consumer group that launches this query.
      * @param topic Topic to query.
@@ -160,7 +160,7 @@ public interface MessageStore {
 
     /**
      * Asynchronous get message
-     * @see org.apache.rocketmq.store.MessageStore#getMessage(String, String, 
int, long, int, int, MessageFilter) getMessage
+     * @see #getMessage(String, String, int, long, int, int, MessageFilter) 
getMessage
      *
      * @param group Consumer group that launches this query.
      * @param topic Topic to query.
@@ -312,7 +312,7 @@ public interface MessageStore {
 
     /**
      * Asynchronous get the store time of the earliest message in this store.
-     * @see org.apache.rocketmq.store.MessageStore#getEarliestMessageTime() 
getEarliestMessageTime
+     * @see #getEarliestMessageTime() getEarliestMessageTime
      *
      * @return timestamp of the earliest message in this store.
      */
@@ -330,7 +330,7 @@ public interface MessageStore {
 
     /**
      * Asynchronous get the store time of the message specified.
-     * @see 
org.apache.rocketmq.store.MessageStore#getMessageStoreTimeStamp(String, int, 
long) getMessageStoreTimeStamp
+     * @see #getMessageStoreTimeStamp(String, int, long) 
getMessageStoreTimeStamp
      *
      * @param topic message topic.
      * @param queueId queue ID.
@@ -396,7 +396,7 @@ public interface MessageStore {
 
     /**
      * Asynchronous query messages by given key.
-     * @see org.apache.rocketmq.store.MessageStore#queryMessage(String, 
String, int, long, long) queryMessage
+     * @see #queryMessage(String, String, int, long, long) queryMessage
      *
      * @param topic topic of the message.
      * @param key message key.
@@ -464,9 +464,31 @@ public interface MessageStore {
      * @param queueId queue ID.
      * @param consumeOffset consume queue offset.
      * @return true if the message is no longer in memory; false otherwise.
+     * @deprecated  As of RIP-57, replaced by {@link 
#checkInMemByConsumeOffset(String, int, long, int)}, see <a 
href="https://github.com/apache/rocketmq/issues/5837";>this issue</a> for more 
details
      */
+    @Deprecated
     boolean checkInDiskByConsumeOffset(final String topic, final int queueId, 
long consumeOffset);
 
+    /**
+     * Check if the given message is in the page cache.
+     *
+     * @param topic         topic.
+     * @param queueId       queue ID.
+     * @param consumeOffset consume queue offset.
+     * @return true if the message is in page cache; false otherwise.
+     */
+    boolean checkInMemByConsumeOffset(final String topic, final int queueId, 
long consumeOffset, int batchSize);
+
+    /**
+     * Check if the given message is in store.
+     *
+     * @param topic         topic.
+     * @param queueId       queue ID.
+     * @param consumeOffset consume queue offset.
+     * @return true if the message is in store; false otherwise.
+     */
+    boolean checkInStoreByConsumeOffset(final String topic, final int queueId, 
long consumeOffset);
+
     /**
      * Get number of the bytes that have been stored in commit log and not yet 
dispatched to consume queue.
      *
diff --git 
a/store/src/main/java/org/apache/rocketmq/store/SelectMappedBufferResult.java 
b/store/src/main/java/org/apache/rocketmq/store/SelectMappedBufferResult.java
index 76919f3d1..317f53605 100644
--- 
a/store/src/main/java/org/apache/rocketmq/store/SelectMappedBufferResult.java
+++ 
b/store/src/main/java/org/apache/rocketmq/store/SelectMappedBufferResult.java
@@ -16,9 +16,8 @@
  */
 package org.apache.rocketmq.store;
 
-import org.apache.rocketmq.store.logfile.MappedFile;
-
 import java.nio.ByteBuffer;
+import org.apache.rocketmq.store.logfile.MappedFile;
 
 public class SelectMappedBufferResult {
 
@@ -67,4 +66,12 @@ public class SelectMappedBufferResult {
     public long getStartOffset() {
         return startOffset;
     }
+
+    public boolean isInMem() {
+        if (mappedFile == null) {
+            return true;
+        }
+        long pos = startOffset - mappedFile.getFileFromOffset();
+        return mappedFile.isLoaded(pos, size);
+    }
 }
diff --git 
a/store/src/main/java/org/apache/rocketmq/store/logfile/DefaultMappedFile.java 
b/store/src/main/java/org/apache/rocketmq/store/logfile/DefaultMappedFile.java
index 7b56150f6..401c64539 100644
--- 
a/store/src/main/java/org/apache/rocketmq/store/logfile/DefaultMappedFile.java
+++ 
b/store/src/main/java/org/apache/rocketmq/store/logfile/DefaultMappedFile.java
@@ -22,6 +22,8 @@ import java.io.File;
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.RandomAccessFile;
+import java.lang.reflect.Field;
+import java.lang.reflect.Method;
 import java.nio.ByteBuffer;
 import java.nio.MappedByteBuffer;
 import java.nio.channels.FileChannel;
@@ -35,6 +37,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.Consumer;
+import org.apache.commons.lang3.SystemUtils;
 import org.apache.rocketmq.common.UtilAll;
 import org.apache.rocketmq.common.constant.LoggerName;
 import org.apache.rocketmq.common.message.MessageExt;
@@ -52,10 +55,15 @@ import org.apache.rocketmq.store.SelectMappedBufferResult;
 import org.apache.rocketmq.store.TransientStorePool;
 import org.apache.rocketmq.store.config.FlushDiskType;
 import org.apache.rocketmq.store.util.LibC;
+import sun.misc.Unsafe;
 import sun.nio.ch.DirectBuffer;
 
 public class DefaultMappedFile extends AbstractMappedFile {
     public static final int OS_PAGE_SIZE = 1024 * 4;
+    public static final Unsafe UNSAFE = getUnsafe();
+    private static final Method IS_LOADED_METHOD;
+    public static final int UNSAFE_PAGE_SIZE = UNSAFE == null ? OS_PAGE_SIZE : 
UNSAFE.pageSize();
+
     protected static final Logger log = 
LoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
 
     protected static final AtomicLong TOTAL_MAPPED_VIRTUAL_MEMORY = new 
AtomicLong(0);
@@ -92,6 +100,18 @@ public class DefaultMappedFile extends AbstractMappedFile {
         WROTE_POSITION_UPDATER = 
AtomicIntegerFieldUpdater.newUpdater(DefaultMappedFile.class, "wrotePosition");
         COMMITTED_POSITION_UPDATER = 
AtomicIntegerFieldUpdater.newUpdater(DefaultMappedFile.class, 
"committedPosition");
         FLUSHED_POSITION_UPDATER = 
AtomicIntegerFieldUpdater.newUpdater(DefaultMappedFile.class, 
"flushedPosition");
+
+        Method isLoaded0method = null;
+        // On the windows platform and openjdk 11 method isLoaded0 always 
returns false.
+        // see 
https://github.com/AdoptOpenJDK/openjdk-jdk11/blob/19fb8f93c59dfd791f62d41f332db9e306bc1422/src/java.base/windows/native/libnio/MappedByteBuffer.c#L34
+        if (!SystemUtils.IS_OS_WINDOWS) {
+            try {
+                isLoaded0method = 
MappedByteBuffer.class.getDeclaredMethod("isLoaded0", long.class, long.class, 
int.class);
+                isLoaded0method.setAccessible(true);
+            } catch (NoSuchMethodException ignore) {
+            }
+        }
+        IS_LOADED_METHOD = isLoaded0method;
     }
 
     public DefaultMappedFile() {
@@ -796,6 +816,41 @@ public class DefaultMappedFile extends AbstractMappedFile {
         return new Itr(startPos);
     }
 
+    public static Unsafe getUnsafe() {
+        try {
+            Field f = Unsafe.class.getDeclaredField("theUnsafe");
+            f.setAccessible(true);
+            return (Unsafe) f.get(null);
+        } catch (Exception ignore) {
+
+        }
+        return null;
+    }
+
+    public static long mappingAddr(long addr) {
+        long offset = addr % UNSAFE_PAGE_SIZE;
+        offset = (offset >= 0) ? offset : (UNSAFE_PAGE_SIZE + offset);
+        return addr - offset;
+    }
+
+    public static int pageCount(long size) {
+        return (int) (size + (long) UNSAFE_PAGE_SIZE - 1L) / UNSAFE_PAGE_SIZE;
+    }
+
+    @Override
+    public boolean isLoaded(long position, int size) {
+        if (IS_LOADED_METHOD == null) {
+            return true;
+        }
+        try {
+            long addr = ((DirectBuffer) mappedByteBuffer).address() + position;
+            return (boolean) IS_LOADED_METHOD.invoke(mappedByteBuffer, 
mappingAddr(addr), size, pageCount(size));
+        } catch (Exception e) {
+            log.info("invoke isLoaded0 of file {} error:", 
file.getAbsolutePath(), e);
+        }
+        return true;
+    }
+
     private class Itr implements Iterator<SelectMappedBufferResult> {
         private int start;
         private int current;
diff --git 
a/store/src/main/java/org/apache/rocketmq/store/logfile/MappedFile.java 
b/store/src/main/java/org/apache/rocketmq/store/logfile/MappedFile.java
index 64e1336e8..dfcf66f08 100644
--- a/store/src/main/java/org/apache/rocketmq/store/logfile/MappedFile.java
+++ b/store/src/main/java/org/apache/rocketmq/store/logfile/MappedFile.java
@@ -360,4 +360,12 @@ public interface MappedFile {
     void init(String fileName, int fileSize, TransientStorePool 
transientStorePool) throws IOException;
 
     Iterator<SelectMappedBufferResult> iterator(int pos);
+
+    /**
+     * Check mapped file is loaded to memory with given position and size
+     * @param position start offset of data
+     * @param size data size
+     * @return data is resided in memory or not
+     */
+    boolean isLoaded(long position, int size);
 }
diff --git 
a/store/src/main/java/org/apache/rocketmq/store/plugin/AbstractPluginMessageStore.java
 
b/store/src/main/java/org/apache/rocketmq/store/plugin/AbstractPluginMessageStore.java
index 47416a873..77908c5fa 100644
--- 
a/store/src/main/java/org/apache/rocketmq/store/plugin/AbstractPluginMessageStore.java
+++ 
b/store/src/main/java/org/apache/rocketmq/store/plugin/AbstractPluginMessageStore.java
@@ -272,10 +272,21 @@ public abstract class AbstractPluginMessageStore 
implements MessageStore {
     }
 
     @Override
+    @Deprecated
     public boolean checkInDiskByConsumeOffset(String topic, int queueId, long 
consumeOffset) {
         return next.checkInDiskByConsumeOffset(topic, queueId, consumeOffset);
     }
 
+    @Override
+    public boolean checkInMemByConsumeOffset(String topic, int queueId, long 
consumeOffset, int batchSize) {
+        return next.checkInMemByConsumeOffset(topic, queueId, consumeOffset, 
batchSize);
+    }
+
+    @Override
+    public boolean checkInStoreByConsumeOffset(String topic, int queueId, long 
consumeOffset) {
+        return next.checkInStoreByConsumeOffset(topic, queueId, consumeOffset);
+    }
+
     @Override
     public long dispatchBehindBytes() {
         return next.dispatchBehindBytes();
diff --git 
a/store/src/test/java/org/apache/rocketmq/store/queue/BatchConsumeMessageTest.java
 
b/store/src/test/java/org/apache/rocketmq/store/queue/BatchConsumeMessageTest.java
index 8e8fee278..1a18f1ba1 100644
--- 
a/store/src/test/java/org/apache/rocketmq/store/queue/BatchConsumeMessageTest.java
+++ 
b/store/src/test/java/org/apache/rocketmq/store/queue/BatchConsumeMessageTest.java
@@ -281,7 +281,7 @@ public class BatchConsumeMessageTest extends QueueTestBase {
         Assert.assertTrue(commitLogOffset <= messageStore.getMaxPhyOffset());
         Assert.assertEquals(commitLogMid, commitLogOffset);
 
-        Assert.assertFalse(messageStore.checkInDiskByConsumeOffset(topic, 0, 
50));
+        Assert.assertTrue(messageStore.checkInMemByConsumeOffset(topic, 0, 50, 
1));
     }
 
     @Test
@@ -331,7 +331,7 @@ public class BatchConsumeMessageTest extends QueueTestBase {
         Assert.assertTrue(commitLogOffset >= messageStore.getMinPhyOffset());
         Assert.assertTrue(commitLogOffset <= messageStore.getMaxPhyOffset());
 
-        Assert.assertFalse(messageStore.checkInDiskByConsumeOffset(topic, 0, 
300));
+        Assert.assertTrue(messageStore.checkInMemByConsumeOffset(topic, 0, 
300, 1));
 
         //get the message Normally
         GetMessageResult getMessageResult = messageStore.getMessage("group", 
topic, 0, 0, 10 * batchNum, null);

Reply via email to