lizhimins commented on code in PR #7594:
URL: https://github.com/apache/rocketmq/pull/7594#discussion_r1413454902
##########
tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageFetcher.java:
##########
@@ -303,74 +270,94 @@ public CompletableFuture<GetMessageResult>
getMessageFromCacheAsync(CompositeQue
recordCacheAccess(flatFile, group, queueOffset, resultWrapperList);
- // if cache hit, result will be returned immediately and
asynchronously prefetch messages for later requests
- if (!resultWrapperList.isEmpty()) {
- LOGGER.debug("MessageFetcher#getMessageFromCacheAsync: cache hit:
" +
- "topic: {}, queue: {}, queue offset: {}, max message num:
{}, cache hit num: {}",
- mq.getTopic(), mq.getQueueId(), queueOffset, maxCount,
resultWrapperList.size());
- prefetchMessage(flatFile, group, maxCount, lastGetOffset + 1);
+ if (resultWrapperList.isEmpty()) {
+ // If cache miss, pull messages immediately
+ LOGGER.info("MessageFetcher cache miss, group: {}, topic: {},
queueId: {}, offset: {}, maxCount: {}",
+ group, mq.getTopic(), mq.getQueueId(), queueOffset, maxCount);
+ } else {
+ // If cache hit, return buffer result immediately and
asynchronously prefetch messages
+ LOGGER.debug("MessageFetcher cache hit, group: {}, topic: {},
queueId: {}, offset: {}, maxCount: {}, resultSize: {}",
+ group, mq.getTopic(), mq.getQueueId(), queueOffset, maxCount,
resultWrapperList.size());
- GetMessageResult result = new GetMessageResult();
+ GetMessageResultExt result = new GetMessageResultExt();
result.setStatus(GetMessageStatus.FOUND);
result.setMinOffset(flatFile.getConsumeQueueMinOffset());
result.setMaxOffset(flatFile.getConsumeQueueCommitOffset());
result.setNextBeginOffset(queueOffset + resultWrapperList.size());
- resultWrapperList.forEach(wrapper ->
result.addMessage(wrapper.getDuplicateResult(), wrapper.getCurOffset()));
+ resultWrapperList.forEach(wrapper -> result.addMessageExt(
+ wrapper.getDuplicateResult(), wrapper.getOffset(),
wrapper.getTagCode()));
+
+ if (lastGetOffset < result.getMaxOffset()) {
+ this.prefetchMessage(flatFile, group, maxCount, lastGetOffset
+ 1);
+ }
return CompletableFuture.completedFuture(result);
}
- // if cache is miss, immediately pull messages
- LOGGER.info("TieredMessageFetcher#getMessageFromCacheAsync: cache
miss: " +
- "topic: {}, queue: {}, queue offset: {}, max message num: {}",
- mq.getTopic(), mq.getQueueId(), queueOffset, maxCount);
-
- CompletableFuture<GetMessageResult> resultFuture;
+ CompletableFuture<GetMessageResultExt> resultFuture;
synchronized (flatFile) {
int batchSize = maxCount * storeConfig.getReadAheadMinFactor();
resultFuture = getMessageFromTieredStoreAsync(flatFile,
queueOffset, batchSize)
- .thenApplyAsync(result -> {
+ .thenApply(result -> {
if (result.getStatus() != GetMessageStatus.FOUND) {
return result;
}
- GetMessageResult newResult = new GetMessageResult();
- newResult.setStatus(GetMessageStatus.FOUND);
-
newResult.setMinOffset(flatFile.getConsumeQueueMinOffset());
-
newResult.setMaxOffset(flatFile.getConsumeQueueCommitOffset());
+ GetMessageResultExt newResult = new GetMessageResultExt();
List<Long> offsetList = result.getMessageQueueOffset();
+ List<Long> tagCodeList = result.getTagCodeList();
List<SelectMappedBufferResult> msgList =
result.getMessageMapedList();
- Long minOffset = offsetList.get(0);
- Long maxOffset = offsetList.get(offsetList.size() - 1);
- int size = offsetList.size();
+
for (int i = 0; i < offsetList.size(); i++) {
- Long offset = offsetList.get(i);
SelectMappedBufferResult msg = msgList.get(i);
- // put message into cache
- SelectMappedBufferResultWrapper resultWrapper =
putMessageToCache(flatFile, offset, msg, minOffset, maxOffset, size, true);
- // try to meet maxCount
+ SelectBufferResultWrapper bufferResult = new
SelectBufferResultWrapper(
+ msg, offsetList.get(i), tagCodeList.get(i), true);
+ this.putMessageToCache(flatFile, bufferResult);
if (newResult.getMessageMapedList().size() < maxCount)
{
-
newResult.addMessage(resultWrapper.getDuplicateResult(), offset);
+ newResult.addMessageExt(msg, offsetList.get(i),
tagCodeList.get(i));
}
}
+
+ newResult.setStatus(GetMessageStatus.FOUND);
+
newResult.setMinOffset(flatFile.getConsumeQueueMinOffset());
+
newResult.setMaxOffset(flatFile.getConsumeQueueCommitOffset());
newResult.setNextBeginOffset(queueOffset +
newResult.getMessageMapedList().size());
return newResult;
- }, TieredStoreExecutor.fetchDataExecutor);
+ });
List<Pair<Integer, CompletableFuture<Long>>> futureList = new
ArrayList<>();
CompletableFuture<Long> inflightRequestFuture =
resultFuture.thenApply(result ->
- result.getStatus() == GetMessageStatus.FOUND ?
result.getMessageQueueOffset().get(result.getMessageQueueOffset().size() - 1) :
-1L);
+ result.getStatus() == GetMessageStatus.FOUND ?
+
result.getMessageQueueOffset().get(result.getMessageQueueOffset().size() - 1) :
-1L);
Review Comment:
![Uploading image.png…]()
This may not be concise enough.
##########
tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageFetcher.java:
##########
@@ -303,74 +270,94 @@ public CompletableFuture<GetMessageResult>
getMessageFromCacheAsync(CompositeQue
recordCacheAccess(flatFile, group, queueOffset, resultWrapperList);
- // if cache hit, result will be returned immediately and
asynchronously prefetch messages for later requests
- if (!resultWrapperList.isEmpty()) {
- LOGGER.debug("MessageFetcher#getMessageFromCacheAsync: cache hit:
" +
- "topic: {}, queue: {}, queue offset: {}, max message num:
{}, cache hit num: {}",
- mq.getTopic(), mq.getQueueId(), queueOffset, maxCount,
resultWrapperList.size());
- prefetchMessage(flatFile, group, maxCount, lastGetOffset + 1);
+ if (resultWrapperList.isEmpty()) {
+ // If cache miss, pull messages immediately
+ LOGGER.info("MessageFetcher cache miss, group: {}, topic: {},
queueId: {}, offset: {}, maxCount: {}",
+ group, mq.getTopic(), mq.getQueueId(), queueOffset, maxCount);
+ } else {
+ // If cache hit, return buffer result immediately and
asynchronously prefetch messages
+ LOGGER.debug("MessageFetcher cache hit, group: {}, topic: {},
queueId: {}, offset: {}, maxCount: {}, resultSize: {}",
+ group, mq.getTopic(), mq.getQueueId(), queueOffset, maxCount,
resultWrapperList.size());
- GetMessageResult result = new GetMessageResult();
+ GetMessageResultExt result = new GetMessageResultExt();
result.setStatus(GetMessageStatus.FOUND);
result.setMinOffset(flatFile.getConsumeQueueMinOffset());
result.setMaxOffset(flatFile.getConsumeQueueCommitOffset());
result.setNextBeginOffset(queueOffset + resultWrapperList.size());
- resultWrapperList.forEach(wrapper ->
result.addMessage(wrapper.getDuplicateResult(), wrapper.getCurOffset()));
+ resultWrapperList.forEach(wrapper -> result.addMessageExt(
+ wrapper.getDuplicateResult(), wrapper.getOffset(),
wrapper.getTagCode()));
+
+ if (lastGetOffset < result.getMaxOffset()) {
+ this.prefetchMessage(flatFile, group, maxCount, lastGetOffset
+ 1);
+ }
return CompletableFuture.completedFuture(result);
}
- // if cache is miss, immediately pull messages
- LOGGER.info("TieredMessageFetcher#getMessageFromCacheAsync: cache
miss: " +
- "topic: {}, queue: {}, queue offset: {}, max message num: {}",
- mq.getTopic(), mq.getQueueId(), queueOffset, maxCount);
-
- CompletableFuture<GetMessageResult> resultFuture;
+ CompletableFuture<GetMessageResultExt> resultFuture;
synchronized (flatFile) {
int batchSize = maxCount * storeConfig.getReadAheadMinFactor();
resultFuture = getMessageFromTieredStoreAsync(flatFile,
queueOffset, batchSize)
- .thenApplyAsync(result -> {
+ .thenApply(result -> {
if (result.getStatus() != GetMessageStatus.FOUND) {
return result;
}
- GetMessageResult newResult = new GetMessageResult();
- newResult.setStatus(GetMessageStatus.FOUND);
-
newResult.setMinOffset(flatFile.getConsumeQueueMinOffset());
-
newResult.setMaxOffset(flatFile.getConsumeQueueCommitOffset());
+ GetMessageResultExt newResult = new GetMessageResultExt();
List<Long> offsetList = result.getMessageQueueOffset();
+ List<Long> tagCodeList = result.getTagCodeList();
List<SelectMappedBufferResult> msgList =
result.getMessageMapedList();
- Long minOffset = offsetList.get(0);
- Long maxOffset = offsetList.get(offsetList.size() - 1);
- int size = offsetList.size();
+
for (int i = 0; i < offsetList.size(); i++) {
- Long offset = offsetList.get(i);
SelectMappedBufferResult msg = msgList.get(i);
- // put message into cache
- SelectMappedBufferResultWrapper resultWrapper =
putMessageToCache(flatFile, offset, msg, minOffset, maxOffset, size, true);
- // try to meet maxCount
+ SelectBufferResultWrapper bufferResult = new
SelectBufferResultWrapper(
+ msg, offsetList.get(i), tagCodeList.get(i), true);
+ this.putMessageToCache(flatFile, bufferResult);
if (newResult.getMessageMapedList().size() < maxCount)
{
-
newResult.addMessage(resultWrapper.getDuplicateResult(), offset);
+ newResult.addMessageExt(msg, offsetList.get(i),
tagCodeList.get(i));
}
}
+
+ newResult.setStatus(GetMessageStatus.FOUND);
+
newResult.setMinOffset(flatFile.getConsumeQueueMinOffset());
+
newResult.setMaxOffset(flatFile.getConsumeQueueCommitOffset());
newResult.setNextBeginOffset(queueOffset +
newResult.getMessageMapedList().size());
return newResult;
- }, TieredStoreExecutor.fetchDataExecutor);
+ });
List<Pair<Integer, CompletableFuture<Long>>> futureList = new
ArrayList<>();
CompletableFuture<Long> inflightRequestFuture =
resultFuture.thenApply(result ->
- result.getStatus() == GetMessageStatus.FOUND ?
result.getMessageQueueOffset().get(result.getMessageQueueOffset().size() - 1) :
-1L);
+ result.getStatus() == GetMessageStatus.FOUND ?
+
result.getMessageQueueOffset().get(result.getMessageQueueOffset().size() - 1) :
-1L);
Review Comment:
![Uploading image.png…]()
This may not be concise enough.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]