This is an automated email from the ASF dual-hosted git repository.
lollipop pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 3949f6938 [ISSUE #5837] Deprecate
MessageStore#checkInDiskByConsumeOffset (#5840)
3949f6938 is described below
commit 3949f6938e136f35e5e929588ef465ab74e82008
Author: SSpirits <[email protected]>
AuthorDate: Mon Jan 9 16:47:42 2023 +0800
[ISSUE #5837] Deprecate MessageStore#checkInDiskByConsumeOffset (#5840)
* deprecate MessageStore#checkInDiskByConsumeOffset and add
checkInMemByConsumeOffset and checkInStoreByConsumeOffset in MessageStore
* skip initializing isLoaded0 method in windows
---
.../broker/offset/BroadcastOffsetManager.java | 2 +-
.../broker/processor/ConsumerManageProcessor.java | 4 +-
.../apache/rocketmq/store/DefaultMessageStore.java | 77 +++++++++++++++++-----
.../org/apache/rocketmq/store/MessageStore.java | 32 +++++++--
.../rocketmq/store/SelectMappedBufferResult.java | 11 +++-
.../rocketmq/store/logfile/DefaultMappedFile.java | 55 ++++++++++++++++
.../apache/rocketmq/store/logfile/MappedFile.java | 8 +++
.../store/plugin/AbstractPluginMessageStore.java | 11 ++++
.../store/queue/BatchConsumeMessageTest.java | 4 +-
9 files changed, 175 insertions(+), 29 deletions(-)
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/offset/BroadcastOffsetManager.java
b/broker/src/main/java/org/apache/rocketmq/broker/offset/BroadcastOffsetManager.java
index 16e70eed2..9896735dd 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/offset/BroadcastOffsetManager.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/offset/BroadcastOffsetManager.java
@@ -116,7 +116,7 @@ public class BroadcastOffsetManager extends ServiceThread {
brokerController.getConsumerOffsetManager().queryOffset(broadcastGroupId(groupId),
topic, queueId);
}
if (storeOffset < 0) {
- if
(!this.brokerController.getMessageStore().checkInDiskByConsumeOffset(topic,
queueId, 0)) {
+ if
(this.brokerController.getMessageStore().checkInMemByConsumeOffset(topic,
queueId, 0, 1)) {
storeOffset = 0;
} else {
storeOffset =
brokerController.getMessageStore().getMaxOffsetInQueue(topic, queueId, true);
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/processor/ConsumerManageProcessor.java
b/broker/src/main/java/org/apache/rocketmq/broker/processor/ConsumerManageProcessor.java
index 395102c7e..9c6d28d3d 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/processor/ConsumerManageProcessor.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/processor/ConsumerManageProcessor.java
@@ -328,8 +328,8 @@ public class ConsumerManageProcessor implements
NettyRequestProcessor {
response.setCode(ResponseCode.QUERY_NOT_FOUND);
response.setRemark("Not found, do not set to zero, maybe this
group boot first");
} else if (minOffset <= 0
- &&
!this.brokerController.getMessageStore().checkInDiskByConsumeOffset(
- requestHeader.getTopic(), requestHeader.getQueueId(), 0)) {
+ &&
this.brokerController.getMessageStore().checkInMemByConsumeOffset(
+ requestHeader.getTopic(), requestHeader.getQueueId(), 0, 1)) {
responseHeader.setOffset(0L);
response.setCode(ResponseCode.SUCCESS);
response.setRemark(null);
diff --git
a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
index 93d006245..b52982dc4 100644
--- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
@@ -795,13 +795,13 @@ public class DefaultMessageStore implements MessageStore {
long offsetPy = cqUnit.getPos();
int sizePy = cqUnit.getSize();
- boolean isInDisk =
checkInDiskByCommitOffset(offsetPy, maxOffsetPy);
+ boolean isInMem =
estimateInMemByCommitOffset(offsetPy, maxOffsetPy);
if ((cqUnit.getQueueOffset() - offset) *
consumeQueue.getUnitSize() > maxFilterMessageSize) {
break;
}
- if (this.isTheBatchFull(sizePy,
cqUnit.getBatchNum(), maxMsgNums, maxPullSize, getResult.getBufferTotalSize(),
getResult.getMessageCount(), isInDisk)) {
+ if (this.isTheBatchFull(sizePy,
cqUnit.getBatchNum(), maxMsgNums, maxPullSize, getResult.getBufferTotalSize(),
getResult.getMessageCount(), isInMem)) {
break;
}
@@ -1418,6 +1418,7 @@ public class DefaultMessageStore implements MessageStore {
}
@Override
+ @Deprecated
public boolean checkInDiskByConsumeOffset(final String topic, final int
queueId, long consumeOffset) {
final long maxOffsetPy = this.commitLog.getMaxOffset();
@@ -1428,7 +1429,7 @@ public class DefaultMessageStore implements MessageStore {
if (cqUnit != null) {
long offsetPy = cqUnit.getPos();
- return checkInDiskByCommitOffset(offsetPy, maxOffsetPy);
+ return !estimateInMemByCommitOffset(offsetPy, maxOffsetPy);
} else {
return false;
}
@@ -1436,6 +1437,38 @@ public class DefaultMessageStore implements MessageStore
{
return false;
}
+ @Override
+ public boolean checkInMemByConsumeOffset(final String topic, final int
queueId, long consumeOffset, int batchSize) {
+ ConsumeQueueInterface consumeQueue = findConsumeQueue(topic, queueId);
+ if (consumeQueue != null) {
+ CqUnit firstCQItem = consumeQueue.get(consumeOffset);
+ if (firstCQItem == null) {
+ return false;
+ }
+ long startOffsetPy = firstCQItem.getPos();
+ if (batchSize <= 1) {
+ int size = firstCQItem.getSize();
+ return checkInMemByCommitOffset(startOffsetPy, size);
+ }
+
+ CqUnit lastCQItem = consumeQueue.get(consumeOffset + batchSize);
+ if (lastCQItem == null) {
+ int size = firstCQItem.getSize();
+ return checkInMemByCommitOffset(startOffsetPy, size);
+ }
+ long endOffsetPy = lastCQItem.getPos();
+ int size = (int) (endOffsetPy - startOffsetPy) +
lastCQItem.getSize();
+ return checkInMemByCommitOffset(startOffsetPy, size);
+ }
+ return false;
+ }
+
+ @Override
+ public boolean checkInStoreByConsumeOffset(String topic, int queueId, long
consumeOffset) {
+ long commitLogOffset = getCommitLogOffsetInQueue(topic, queueId,
consumeOffset);
+ return checkInDiskByCommitOffset(commitLogOffset);
+ }
+
@Override
public long dispatchBehindBytes() {
return this.reputMessageService.behind();
@@ -1600,13 +1633,29 @@ public class DefaultMessageStore implements
MessageStore {
return nextOffset;
}
- private boolean checkInDiskByCommitOffset(long offsetPy, long maxOffsetPy)
{
+ private boolean estimateInMemByCommitOffset(long offsetPy, long
maxOffsetPy) {
long memory = (long) (StoreUtil.TOTAL_PHYSICAL_MEMORY_SIZE *
(this.messageStoreConfig.getAccessMessageInMemoryMaxRatio() / 100.0));
- return (maxOffsetPy - offsetPy) > memory;
+ return (maxOffsetPy - offsetPy) <= memory;
+ }
+
+ private boolean checkInMemByCommitOffset(long offsetPy, int size) {
+ SelectMappedBufferResult message = this.commitLog.getMessage(offsetPy,
size);
+ if (message != null) {
+ try {
+ return message.isInMem();
+ } finally {
+ message.release();
+ }
+ }
+ return false;
+ }
+
+ public boolean checkInDiskByCommitOffset(long offsetPy) {
+ return offsetPy >= commitLog.getMinOffset();
}
private boolean isTheBatchFull(int sizePy, int unitBatchNum, int
maxMsgNums, long maxMsgSize, int bufferTotal,
- int messageTotal, boolean isInDisk) {
+ int messageTotal, boolean isInMem) {
if (0 == bufferTotal || 0 == messageTotal) {
return false;
@@ -1620,25 +1669,19 @@ public class DefaultMessageStore implements
MessageStore {
return true;
}
- if (isInDisk) {
- if ((bufferTotal + sizePy) >
this.messageStoreConfig.getMaxTransferBytesOnMessageInDisk()) {
+ if (isInMem) {
+ if ((bufferTotal + sizePy) >
this.messageStoreConfig.getMaxTransferBytesOnMessageInMemory()) {
return true;
}
- if (messageTotal >
this.messageStoreConfig.getMaxTransferCountOnMessageInDisk() - 1) {
- return true;
- }
+ return messageTotal >
this.messageStoreConfig.getMaxTransferCountOnMessageInMemory() - 1;
} else {
- if ((bufferTotal + sizePy) >
this.messageStoreConfig.getMaxTransferBytesOnMessageInMemory()) {
+ if ((bufferTotal + sizePy) >
this.messageStoreConfig.getMaxTransferBytesOnMessageInDisk()) {
return true;
}
- if (messageTotal >
this.messageStoreConfig.getMaxTransferCountOnMessageInMemory() - 1) {
- return true;
- }
+ return messageTotal >
this.messageStoreConfig.getMaxTransferCountOnMessageInDisk() - 1;
}
-
- return false;
}
private void deleteFile(final String fileName) {
diff --git a/store/src/main/java/org/apache/rocketmq/store/MessageStore.java
b/store/src/main/java/org/apache/rocketmq/store/MessageStore.java
index bbf2056cc..8e86f8abe 100644
--- a/store/src/main/java/org/apache/rocketmq/store/MessageStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/MessageStore.java
@@ -129,7 +129,7 @@ public interface MessageStore {
/**
* Asynchronous get message
- * @see org.apache.rocketmq.store.MessageStore#getMessage(String, String,
int, long, int, MessageFilter) getMessage
+ * @see #getMessage(String, String, int, long, int, MessageFilter)
getMessage
*
* @param group Consumer group that launches this query.
* @param topic Topic to query.
@@ -160,7 +160,7 @@ public interface MessageStore {
/**
* Asynchronous get message
- * @see org.apache.rocketmq.store.MessageStore#getMessage(String, String,
int, long, int, int, MessageFilter) getMessage
+ * @see #getMessage(String, String, int, long, int, int, MessageFilter)
getMessage
*
* @param group Consumer group that launches this query.
* @param topic Topic to query.
@@ -312,7 +312,7 @@ public interface MessageStore {
/**
* Asynchronous get the store time of the earliest message in this store.
- * @see org.apache.rocketmq.store.MessageStore#getEarliestMessageTime()
getEarliestMessageTime
+ * @see #getEarliestMessageTime() getEarliestMessageTime
*
* @return timestamp of the earliest message in this store.
*/
@@ -330,7 +330,7 @@ public interface MessageStore {
/**
* Asynchronous get the store time of the message specified.
- * @see
org.apache.rocketmq.store.MessageStore#getMessageStoreTimeStamp(String, int,
long) getMessageStoreTimeStamp
+ * @see #getMessageStoreTimeStamp(String, int, long)
getMessageStoreTimeStamp
*
* @param topic message topic.
* @param queueId queue ID.
@@ -396,7 +396,7 @@ public interface MessageStore {
/**
* Asynchronous query messages by given key.
- * @see org.apache.rocketmq.store.MessageStore#queryMessage(String,
String, int, long, long) queryMessage
+ * @see #queryMessage(String, String, int, long, long) queryMessage
*
* @param topic topic of the message.
* @param key message key.
@@ -464,9 +464,31 @@ public interface MessageStore {
* @param queueId queue ID.
* @param consumeOffset consume queue offset.
* @return true if the message is no longer in memory; false otherwise.
+ * @deprecated As of RIP-57, replaced by {@link
#checkInMemByConsumeOffset(String, int, long, int)}, see <a
href="https://github.com/apache/rocketmq/issues/5837">this issue</a> for more
details
*/
+ @Deprecated
boolean checkInDiskByConsumeOffset(final String topic, final int queueId,
long consumeOffset);
+ /**
+ * Check if the given message is in the page cache.
+ *
+ * @param topic topic.
+ * @param queueId queue ID.
+ * @param consumeOffset consume queue offset.
+ * @return true if the message is in page cache; false otherwise.
+ */
+ boolean checkInMemByConsumeOffset(final String topic, final int queueId,
long consumeOffset, int batchSize);
+
+ /**
+ * Check if the given message is in store.
+ *
+ * @param topic topic.
+ * @param queueId queue ID.
+ * @param consumeOffset consume queue offset.
+ * @return true if the message is in store; false otherwise.
+ */
+ boolean checkInStoreByConsumeOffset(final String topic, final int queueId,
long consumeOffset);
+
/**
* Get number of the bytes that have been stored in commit log and not yet
dispatched to consume queue.
*
diff --git
a/store/src/main/java/org/apache/rocketmq/store/SelectMappedBufferResult.java
b/store/src/main/java/org/apache/rocketmq/store/SelectMappedBufferResult.java
index 76919f3d1..317f53605 100644
---
a/store/src/main/java/org/apache/rocketmq/store/SelectMappedBufferResult.java
+++
b/store/src/main/java/org/apache/rocketmq/store/SelectMappedBufferResult.java
@@ -16,9 +16,8 @@
*/
package org.apache.rocketmq.store;
-import org.apache.rocketmq.store.logfile.MappedFile;
-
import java.nio.ByteBuffer;
+import org.apache.rocketmq.store.logfile.MappedFile;
public class SelectMappedBufferResult {
@@ -67,4 +66,12 @@ public class SelectMappedBufferResult {
public long getStartOffset() {
return startOffset;
}
+
+ public boolean isInMem() {
+ if (mappedFile == null) {
+ return true;
+ }
+ long pos = startOffset - mappedFile.getFileFromOffset();
+ return mappedFile.isLoaded(pos, size);
+ }
}
diff --git
a/store/src/main/java/org/apache/rocketmq/store/logfile/DefaultMappedFile.java
b/store/src/main/java/org/apache/rocketmq/store/logfile/DefaultMappedFile.java
index 7b56150f6..401c64539 100644
---
a/store/src/main/java/org/apache/rocketmq/store/logfile/DefaultMappedFile.java
+++
b/store/src/main/java/org/apache/rocketmq/store/logfile/DefaultMappedFile.java
@@ -22,6 +22,8 @@ import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.RandomAccessFile;
+import java.lang.reflect.Field;
+import java.lang.reflect.Method;
import java.nio.ByteBuffer;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
@@ -35,6 +37,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
+import org.apache.commons.lang3.SystemUtils;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.message.MessageExt;
@@ -52,10 +55,15 @@ import org.apache.rocketmq.store.SelectMappedBufferResult;
import org.apache.rocketmq.store.TransientStorePool;
import org.apache.rocketmq.store.config.FlushDiskType;
import org.apache.rocketmq.store.util.LibC;
+import sun.misc.Unsafe;
import sun.nio.ch.DirectBuffer;
public class DefaultMappedFile extends AbstractMappedFile {
public static final int OS_PAGE_SIZE = 1024 * 4;
+ public static final Unsafe UNSAFE = getUnsafe();
+ private static final Method IS_LOADED_METHOD;
+ public static final int UNSAFE_PAGE_SIZE = UNSAFE == null ? OS_PAGE_SIZE :
UNSAFE.pageSize();
+
protected static final Logger log =
LoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
protected static final AtomicLong TOTAL_MAPPED_VIRTUAL_MEMORY = new
AtomicLong(0);
@@ -92,6 +100,18 @@ public class DefaultMappedFile extends AbstractMappedFile {
WROTE_POSITION_UPDATER =
AtomicIntegerFieldUpdater.newUpdater(DefaultMappedFile.class, "wrotePosition");
COMMITTED_POSITION_UPDATER =
AtomicIntegerFieldUpdater.newUpdater(DefaultMappedFile.class,
"committedPosition");
FLUSHED_POSITION_UPDATER =
AtomicIntegerFieldUpdater.newUpdater(DefaultMappedFile.class,
"flushedPosition");
+
+ Method isLoaded0method = null;
+ // On the windows platform and openjdk 11 method isLoaded0 always
returns false.
+ // see
https://github.com/AdoptOpenJDK/openjdk-jdk11/blob/19fb8f93c59dfd791f62d41f332db9e306bc1422/src/java.base/windows/native/libnio/MappedByteBuffer.c#L34
+ if (!SystemUtils.IS_OS_WINDOWS) {
+ try {
+ isLoaded0method =
MappedByteBuffer.class.getDeclaredMethod("isLoaded0", long.class, long.class,
int.class);
+ isLoaded0method.setAccessible(true);
+ } catch (NoSuchMethodException ignore) {
+ }
+ }
+ IS_LOADED_METHOD = isLoaded0method;
}
public DefaultMappedFile() {
@@ -796,6 +816,41 @@ public class DefaultMappedFile extends AbstractMappedFile {
return new Itr(startPos);
}
+ public static Unsafe getUnsafe() {
+ try {
+ Field f = Unsafe.class.getDeclaredField("theUnsafe");
+ f.setAccessible(true);
+ return (Unsafe) f.get(null);
+ } catch (Exception ignore) {
+
+ }
+ return null;
+ }
+
+ public static long mappingAddr(long addr) {
+ long offset = addr % UNSAFE_PAGE_SIZE;
+ offset = (offset >= 0) ? offset : (UNSAFE_PAGE_SIZE + offset);
+ return addr - offset;
+ }
+
+ public static int pageCount(long size) {
+ return (int) (size + (long) UNSAFE_PAGE_SIZE - 1L) / UNSAFE_PAGE_SIZE;
+ }
+
+ @Override
+ public boolean isLoaded(long position, int size) {
+ if (IS_LOADED_METHOD == null) {
+ return true;
+ }
+ try {
+ long addr = ((DirectBuffer) mappedByteBuffer).address() + position;
+ return (boolean) IS_LOADED_METHOD.invoke(mappedByteBuffer,
mappingAddr(addr), size, pageCount(size));
+ } catch (Exception e) {
+ log.info("invoke isLoaded0 of file {} error:",
file.getAbsolutePath(), e);
+ }
+ return true;
+ }
+
private class Itr implements Iterator<SelectMappedBufferResult> {
private int start;
private int current;
diff --git
a/store/src/main/java/org/apache/rocketmq/store/logfile/MappedFile.java
b/store/src/main/java/org/apache/rocketmq/store/logfile/MappedFile.java
index 64e1336e8..dfcf66f08 100644
--- a/store/src/main/java/org/apache/rocketmq/store/logfile/MappedFile.java
+++ b/store/src/main/java/org/apache/rocketmq/store/logfile/MappedFile.java
@@ -360,4 +360,12 @@ public interface MappedFile {
void init(String fileName, int fileSize, TransientStorePool
transientStorePool) throws IOException;
Iterator<SelectMappedBufferResult> iterator(int pos);
+
+ /**
+ * Check mapped file is loaded to memory with given position and size
+ * @param position start offset of data
+ * @param size data size
+ * @return data is resided in memory or not
+ */
+ boolean isLoaded(long position, int size);
}
diff --git
a/store/src/main/java/org/apache/rocketmq/store/plugin/AbstractPluginMessageStore.java
b/store/src/main/java/org/apache/rocketmq/store/plugin/AbstractPluginMessageStore.java
index 47416a873..77908c5fa 100644
---
a/store/src/main/java/org/apache/rocketmq/store/plugin/AbstractPluginMessageStore.java
+++
b/store/src/main/java/org/apache/rocketmq/store/plugin/AbstractPluginMessageStore.java
@@ -272,10 +272,21 @@ public abstract class AbstractPluginMessageStore
implements MessageStore {
}
@Override
+ @Deprecated
public boolean checkInDiskByConsumeOffset(String topic, int queueId, long
consumeOffset) {
return next.checkInDiskByConsumeOffset(topic, queueId, consumeOffset);
}
+ @Override
+ public boolean checkInMemByConsumeOffset(String topic, int queueId, long
consumeOffset, int batchSize) {
+ return next.checkInMemByConsumeOffset(topic, queueId, consumeOffset,
batchSize);
+ }
+
+ @Override
+ public boolean checkInStoreByConsumeOffset(String topic, int queueId, long
consumeOffset) {
+ return next.checkInStoreByConsumeOffset(topic, queueId, consumeOffset);
+ }
+
@Override
public long dispatchBehindBytes() {
return next.dispatchBehindBytes();
diff --git
a/store/src/test/java/org/apache/rocketmq/store/queue/BatchConsumeMessageTest.java
b/store/src/test/java/org/apache/rocketmq/store/queue/BatchConsumeMessageTest.java
index 8e8fee278..1a18f1ba1 100644
---
a/store/src/test/java/org/apache/rocketmq/store/queue/BatchConsumeMessageTest.java
+++
b/store/src/test/java/org/apache/rocketmq/store/queue/BatchConsumeMessageTest.java
@@ -281,7 +281,7 @@ public class BatchConsumeMessageTest extends QueueTestBase {
Assert.assertTrue(commitLogOffset <= messageStore.getMaxPhyOffset());
Assert.assertEquals(commitLogMid, commitLogOffset);
- Assert.assertFalse(messageStore.checkInDiskByConsumeOffset(topic, 0,
50));
+ Assert.assertTrue(messageStore.checkInMemByConsumeOffset(topic, 0, 50,
1));
}
@Test
@@ -331,7 +331,7 @@ public class BatchConsumeMessageTest extends QueueTestBase {
Assert.assertTrue(commitLogOffset >= messageStore.getMinPhyOffset());
Assert.assertTrue(commitLogOffset <= messageStore.getMaxPhyOffset());
- Assert.assertFalse(messageStore.checkInDiskByConsumeOffset(topic, 0,
300));
+ Assert.assertTrue(messageStore.checkInMemByConsumeOffset(topic, 0,
300, 1));
//get the message Normally
GetMessageResult getMessageResult = messageStore.getMessage("group",
topic, 0, 0, 10 * batchNum, null);