This is an automated email from the ASF dual-hosted git repository.
lizhimin pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 2089abd3be [ISSUE #9080] Fix tranfer logic when get large messages
from cache in tiered storage (#9079)
2089abd3be is described below
commit 2089abd3be5c4f34d2a981caf32747a1e624f2ed
Author: yuz10 <[email protected]>
AuthorDate: Thu Dec 26 14:49:00 2024 +0800
[ISSUE #9080] Fix tranfer logic when get large messages from cache in
tiered storage (#9079)
---
.../apache/rocketmq/tieredstore/core/MessageStoreFetcherImpl.java | 5 +++++
.../rocketmq/tieredstore/core/MessageStoreDispatcherImplTest.java | 1 +
2 files changed, 6 insertions(+)
diff --git
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/core/MessageStoreFetcherImpl.java
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/core/MessageStoreFetcherImpl.java
index 7f79dbcd98..e94185626a 100644
---
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/core/MessageStoreFetcherImpl.java
+++
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/core/MessageStoreFetcherImpl.java
@@ -56,6 +56,7 @@ public class MessageStoreFetcherImpl implements
MessageStoreFetcher {
private final String brokerName;
private final MetadataStore metadataStore;
private final MessageStoreConfig storeConfig;
+ private final org.apache.rocketmq.store.config.MessageStoreConfig
messageStoreConfig;
private final TieredMessageStore messageStore;
private final IndexService indexService;
private final FlatFileStore flatFileStore;
@@ -71,6 +72,7 @@ public class MessageStoreFetcherImpl implements
MessageStoreFetcher {
FlatFileStore flatFileStore, IndexService indexService) {
this.storeConfig = storeConfig;
+ this.messageStoreConfig = messageStore.getMessageStoreConfig();
this.brokerName = storeConfig.getBrokerName();
this.flatFileStore = flatFileStore;
this.messageStore = messageStore;
@@ -148,6 +150,9 @@ public class MessageStoreFetcherImpl implements
MessageStoreFetcher {
if (result.getMessageCount() == maxCount) {
break;
}
+ if (result.getBufferTotalSize() >=
messageStoreConfig.getMaxTransferBytesOnMessageInMemory()) {
+ break;
+ }
}
result.setStatus(result.getMessageCount() > 0 ?
GetMessageStatus.FOUND : GetMessageStatus.NO_MATCHED_MESSAGE);
diff --git
a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/core/MessageStoreDispatcherImplTest.java
b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/core/MessageStoreDispatcherImplTest.java
index 6b96076948..7a43e1ede8 100644
---
a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/core/MessageStoreDispatcherImplTest.java
+++
b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/core/MessageStoreDispatcherImplTest.java
@@ -106,6 +106,7 @@ public class MessageStoreDispatcherImplTest {
Mockito.when(messageStore.getStoreExecutor()).thenReturn(executor);
Mockito.when(messageStore.getFlatFileStore()).thenReturn(fileStore);
Mockito.when(messageStore.getIndexService()).thenReturn(indexService);
+ Mockito.when(messageStore.getMessageStoreConfig()).thenReturn(new
org.apache.rocketmq.store.config.MessageStoreConfig());
// mock message
ByteBuffer buffer = MessageFormatUtilTest.buildMockedMessageBuffer();