This is an automated email from the ASF dual-hosted git repository.
lizhimin pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new d9c75ff1f9 [ISSUE #7872] Fix Tiered Store Query Message Async return
different view each time (#7874)
d9c75ff1f9 is described below
commit d9c75ff1f94d0b25bd79532cc326eff5a994e47d
Author: AYue <[email protected]>
AuthorDate: Tue Mar 26 15:12:38 2024 +0800
[ISSUE #7872] Fix Tiered Store Query Message Async return different view
each time (#7874)
Co-authored-by: ayue <[email protected]>
---
.../tieredstore/core/MessageStoreFetcherImpl.java | 16 +++++++++-------
1 file changed, 9 insertions(+), 7 deletions(-)
diff --git
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/core/MessageStoreFetcherImpl.java
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/core/MessageStoreFetcherImpl.java
index 2ffad2e3f4..5403ebdc31 100644
---
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/core/MessageStoreFetcherImpl.java
+++
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/core/MessageStoreFetcherImpl.java
@@ -394,8 +394,7 @@ public class MessageStoreFetcherImpl implements
MessageStoreFetcher {
messageStore.getIndexService().queryAsync(topic, key, maxCount,
begin, end);
return future.thenCompose(indexItemList -> {
- QueryMessageResult result = new QueryMessageResult();
- List<CompletableFuture<Void>> futureList = new
ArrayList<>(maxCount);
+ List<CompletableFuture<SelectMappedBufferResult>> futureList = new
ArrayList<>(maxCount);
for (IndexItem indexItem : indexItemList) {
if (topicId != indexItem.getTopicId()) {
continue;
@@ -405,17 +404,20 @@ public class MessageStoreFetcherImpl implements
MessageStoreFetcher {
if (flatFile == null) {
continue;
}
- CompletableFuture<Void> getMessageFuture = flatFile
+ CompletableFuture<SelectMappedBufferResult> getMessageFuture =
flatFile
.getCommitLogAsync(indexItem.getOffset(),
indexItem.getSize())
- .thenAccept(messageBuffer -> result.addMessage(
- new SelectMappedBufferResult(
- indexItem.getOffset(), messageBuffer,
indexItem.getSize(), null)));
+ .thenApply(messageBuffer -> new SelectMappedBufferResult(
+ indexItem.getOffset(), messageBuffer,
indexItem.getSize(), null));
futureList.add(getMessageFuture);
if (futureList.size() >= maxCount) {
break;
}
}
- return CompletableFuture.allOf(futureList.toArray(new
CompletableFuture[0])).thenApply(v -> result);
+ return CompletableFuture.allOf(futureList.toArray(new
CompletableFuture[0])).thenApply(v -> {
+ QueryMessageResult result = new QueryMessageResult();
+ futureList.forEach(f -> f.thenAccept(result::addMessage));
+ return result;
+ });
}).whenComplete((result, throwable) -> {
if (result != null) {
log.info("MessageFetcher#queryMessageAsync, " +