This is an automated email from the ASF dual-hosted git repository.
lizhimin pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new c96a0b566 [ISSUE #6933] Support delete expired or damaged file in
tiered storage and optimize fetch code (#6952)
c96a0b566 is described below
commit c96a0b56658b48b17b762a1d2894e6d0576acad1
Author: lizhimins <[email protected]>
AuthorDate: Tue Jun 27 17:53:43 2023 +0800
[ISSUE #6933] Support delete expired or damaged file in tiered storage and
optimize fetch code (#6952)
If cq dispatch smaller than local store min offset, do self-healing logic
for storage and rebuild automatically
---
.../rocketmq/tieredstore/MessageStoreFetcher.java | 80 +++++++++
.../rocketmq/tieredstore/TieredDispatcher.java | 15 +-
.../rocketmq/tieredstore/TieredMessageFetcher.java | 196 +++++++++++++--------
.../rocketmq/tieredstore/file/TieredFlatFile.java | 10 +-
.../rocketmq/tieredstore/file/TieredIndexFile.java | 17 +-
.../metrics/TieredStoreMetricsManager.java | 4 +-
.../inputstream/TieredCommitLogInputStream.java | 3 +-
.../tieredstore/TieredMessageFetcherTest.java | 16 +-
8 files changed, 239 insertions(+), 102 deletions(-)
diff --git
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/MessageStoreFetcher.java
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/MessageStoreFetcher.java
new file mode 100644
index 000000000..f4d576d29
--- /dev/null
+++
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/MessageStoreFetcher.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.tieredstore;
+
+import java.util.concurrent.CompletableFuture;
+import org.apache.rocketmq.store.GetMessageResult;
+import org.apache.rocketmq.store.MessageFilter;
+import org.apache.rocketmq.store.QueryMessageResult;
+import org.apache.rocketmq.tieredstore.common.BoundaryType;
+
+public interface MessageStoreFetcher {
+
+ /**
+ * Asynchronous get the store time of the earliest message in this store.
+ *
+ * @return timestamp of the earliest message in this store.
+ */
+ CompletableFuture<Long> getEarliestMessageTimeAsync(String topic, int
queueId);
+
+ /**
+ * Asynchronous get the store time of the message specified.
+ *
+ * @param topic Message topic.
+ * @param queueId Queue ID.
+ * @param consumeQueueOffset Consume queue offset.
+ * @return store timestamp of the message.
+ */
+ CompletableFuture<Long> getMessageStoreTimeStampAsync(String topic, int
queueId, long consumeQueueOffset);
+
+ /**
+ * Look up the physical offset of the message whose store timestamp is as
specified.
+ *
+ * @param topic Topic of the message.
+ * @param queueId Queue ID.
+ * @param timestamp Timestamp to look up.
+ * @return physical offset which matches.
+ */
+ long getOffsetInQueueByTime(String topic, int queueId, long timestamp,
BoundaryType type);
+
+ /**
+ * Asynchronous get message
+ *
+ * @param group Consumer group that launches this query.
+ * @param topic Topic to query.
+ * @param queueId Queue ID to query.
+ * @param offset Logical offset to start from.
+ * @param maxCount Maximum count of messages to query.
+ * @param messageFilter Message filter used to screen desired messages.
+ * @return Matched messages.
+ */
+ CompletableFuture<GetMessageResult> getMessageAsync(
+ String group, String topic, int queueId, long offset, int maxCount,
MessageFilter messageFilter);
+
+ /**
+ * Asynchronous query messages by given key.
+ *
+ * @param topic Topic of the message.
+ * @param key Message key.
+ * @param maxCount Maximum count of the messages possible.
+ * @param begin Begin timestamp.
+ * @param end End timestamp.
+ */
+ CompletableFuture<QueryMessageResult> queryMessageAsync(
+ String topic, String key, int maxCount, long begin, long end);
+}
diff --git
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredDispatcher.java
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredDispatcher.java
index 0d89d305b..2a8e2ed71 100644
---
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredDispatcher.java
+++
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredDispatcher.java
@@ -260,8 +260,16 @@ public class TieredDispatcher extends ServiceThread
implements CommitLogDispatch
logger.warn("TieredDispatcher#dispatchFlatFile: dispatch
offset is too small, " +
"topic: {}, queueId: {}, dispatch offset: {}, local cq
offset range {}-{}",
topic, queueId, dispatchOffset, minOffsetInQueue,
maxOffsetInQueue);
- flatFile.initOffset(minOffsetInQueue);
- dispatchOffset = minOffsetInQueue;
+
+ // when dispatch offset is smaller than min offset in local cq
+ // some earliest messages may be lost at this time
+
tieredFlatFileManager.destroyCompositeFile(flatFile.getMessageQueue());
+ CompositeQueueFlatFile newFlatFile =
+ tieredFlatFileManager.getOrCreateFlatFileIfAbsent(new
MessageQueue(topic, brokerName, queueId));
+ if (newFlatFile != null) {
+ newFlatFile.initOffset(maxOffsetInQueue);
+ }
+ return;
}
beforeOffset = dispatchOffset;
@@ -290,7 +298,8 @@ public class TieredDispatcher extends ServiceThread
implements CommitLogDispatch
logger.error("TieredDispatcher#dispatchFlatFile: get
message from next store failed, " +
"topic: {}, queueId: {}, commitLog offset: {},
size: {}",
topic, queueId, commitLogOffset, size);
- break;
+ // not dispatch immediately
+ return;
}
// append commitlog will increase dispatch offset here
diff --git
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageFetcher.java
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageFetcher.java
index 39a2e2aff..8802a73a3 100644
---
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageFetcher.java
+++
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageFetcher.java
@@ -60,52 +60,49 @@ import
org.apache.rocketmq.tieredstore.util.CQItemBufferUtil;
import org.apache.rocketmq.tieredstore.util.MessageBufferUtil;
import org.apache.rocketmq.tieredstore.util.TieredStoreUtil;
-public class TieredMessageFetcher {
+public class TieredMessageFetcher implements MessageStoreFetcher {
+
private static final Logger LOGGER =
LoggerFactory.getLogger(TieredStoreUtil.TIERED_STORE_LOGGER_NAME);
- private final TieredMessageStoreConfig storeConfig;
private final String brokerName;
- private TieredMetadataStore metadataStore;
+ private final TieredMessageStoreConfig storeConfig;
+ private final TieredMetadataStore metadataStore;
private final TieredFlatFileManager flatFileManager;
- protected final Cache<MessageCacheKey, SelectMappedBufferResultWrapper>
readAheadCache;
+ private final Cache<MessageCacheKey, SelectMappedBufferResultWrapper>
readAheadCache;
public TieredMessageFetcher(TieredMessageStoreConfig storeConfig) {
this.storeConfig = storeConfig;
this.brokerName = storeConfig.getBrokerName();
+ this.metadataStore = TieredStoreUtil.getMetadataStore(storeConfig);
this.flatFileManager = TieredFlatFileManager.getInstance(storeConfig);
- this.readAheadCache = Caffeine.newBuilder()
+ this.readAheadCache = this.initCache(storeConfig);
+ }
+
+ private Cache<MessageCacheKey, SelectMappedBufferResultWrapper>
initCache(TieredMessageStoreConfig storeConfig) {
+ long memoryMaxSize =
+ (long) (Runtime.getRuntime().maxMemory() *
storeConfig.getReadAheadCacheSizeThresholdRate());
+
+ return Caffeine.newBuilder()
.scheduler(Scheduler.systemScheduler())
- // TODO adjust expire time dynamically
.expireAfterWrite(storeConfig.getReadAheadCacheExpireDuration(),
TimeUnit.MILLISECONDS)
- .maximumWeight((long) (Runtime.getRuntime().maxMemory() *
storeConfig.getReadAheadCacheSizeThresholdRate()))
+ .maximumWeight(memoryMaxSize)
+ // Using the buffer size of messages to calculate memory usage
.weigher((MessageCacheKey key, SelectMappedBufferResultWrapper
msg) -> msg.getDuplicateResult().getSize())
.recordStats()
.build();
- try {
- this.metadataStore = TieredStoreUtil.getMetadataStore(storeConfig);
- } catch (Exception ignored) {
-
- }
}
- public Cache<MessageCacheKey, SelectMappedBufferResultWrapper>
getReadAheadCache() {
- return readAheadCache;
- }
+ protected SelectMappedBufferResultWrapper
putMessageToCache(CompositeFlatFile flatFile,
+ long queueOffset, SelectMappedBufferResult result, long minOffset,
long maxOffset, int size) {
- public CompletableFuture<GetMessageResult>
getMessageFromCacheAsync(CompositeQueueFlatFile flatFile,
- String group, long queueOffset, int maxMsgNums) {
- // wait for inflight request by default
- return getMessageFromCacheAsync(flatFile, group, queueOffset,
maxMsgNums, true);
+ return putMessageToCache(flatFile, queueOffset, result, minOffset,
maxOffset, size, false);
}
- protected SelectMappedBufferResultWrapper
putMessageToCache(CompositeFlatFile flatFile, long queueOffset,
- SelectMappedBufferResult msg, long minOffset, long maxOffset, int
size) {
- return putMessageToCache(flatFile, queueOffset, msg, minOffset,
maxOffset, size, false);
- }
+ protected SelectMappedBufferResultWrapper
putMessageToCache(CompositeFlatFile flatFile,
+ long queueOffset, SelectMappedBufferResult result, long minOffset,
long maxOffset, int size, boolean used) {
- protected SelectMappedBufferResultWrapper
putMessageToCache(CompositeFlatFile flatFile, long queueOffset,
- SelectMappedBufferResult msg, long minOffset, long maxOffset, int
size, boolean used) {
- SelectMappedBufferResultWrapper wrapper = new
SelectMappedBufferResultWrapper(msg, queueOffset, minOffset, maxOffset, size);
+ SelectMappedBufferResultWrapper wrapper =
+ new SelectMappedBufferResultWrapper(result, queueOffset,
minOffset, maxOffset, size);
if (used) {
wrapper.addAccessCount();
}
@@ -113,9 +110,20 @@ public class TieredMessageFetcher {
return wrapper;
}
+ // Visible for metrics monitor
+ public Cache<MessageCacheKey, SelectMappedBufferResultWrapper>
getMessageCache() {
+ return readAheadCache;
+ }
+
+ // Waiting for the request in transit to complete
+ protected CompletableFuture<GetMessageResult> getMessageFromCacheAsync(
+ CompositeQueueFlatFile flatFile, String group, long queueOffset, int
maxCount) {
+
+ return getMessageFromCacheAsync(flatFile, group, queueOffset,
maxCount, true);
+ }
+
@Nullable
- protected SelectMappedBufferResultWrapper
getMessageFromCache(CompositeFlatFile flatFile,
- long queueOffset) {
+ protected SelectMappedBufferResultWrapper
getMessageFromCache(CompositeFlatFile flatFile, long queueOffset) {
MessageCacheKey cacheKey = new MessageCacheKey(flatFile, queueOffset);
return readAheadCache.getIfPresent(cacheKey);
}
@@ -135,21 +143,21 @@ public class TieredMessageFetcher {
}
}
- private void preFetchMessage(CompositeQueueFlatFile flatFile, String
group, int maxMsgNums,
- long nextBeginOffset) {
- if (maxMsgNums == 1 || flatFile.getReadAheadFactor() == 1) {
+ private void prefetchMessage(CompositeQueueFlatFile flatFile, String
group, int maxCount, long nextBeginOffset) {
+ if (maxCount == 1 || flatFile.getReadAheadFactor() == 1) {
return;
}
+
MessageQueue mq = flatFile.getMessageQueue();
- // make sure there is only one inflight request per group and request
range
- int prefetchBatchSize = Math.min(maxMsgNums *
flatFile.getReadAheadFactor(), storeConfig.getReadAheadMessageCountThreshold());
+ // make sure there is only one request per group and request range
+ int prefetchBatchSize = Math.min(maxCount *
flatFile.getReadAheadFactor(), storeConfig.getReadAheadMessageCountThreshold());
InFlightRequestFuture inflightRequest =
flatFile.getInflightRequest(group, nextBeginOffset, prefetchBatchSize);
if (!inflightRequest.isAllDone()) {
return;
}
synchronized (flatFile) {
- inflightRequest = flatFile.getInflightRequest(nextBeginOffset,
maxMsgNums);
+ inflightRequest = flatFile.getInflightRequest(nextBeginOffset,
maxCount);
if (!inflightRequest.isAllDone()) {
return;
}
@@ -161,7 +169,10 @@ public class TieredMessageFetcher {
int cacheRemainCount = (int) (maxOffsetOfLastRequest -
nextBeginOffset);
LOGGER.debug("TieredMessageFetcher#preFetchMessage: group={},
nextBeginOffset={}, maxOffsetOfLastRequest={}, lastRequestIsExpired={},
cacheRemainCount={}",
group, nextBeginOffset, maxOffsetOfLastRequest,
lastRequestIsExpired, cacheRemainCount);
- if (lastRequestIsExpired || maxOffsetOfLastRequest != -1L &&
nextBeginOffset >= inflightRequest.getStartOffset()) {
+
+ if (lastRequestIsExpired
+ || maxOffsetOfLastRequest != -1L && nextBeginOffset >=
inflightRequest.getStartOffset()) {
+
long queueOffset;
if (lastRequestIsExpired) {
queueOffset = nextBeginOffset;
@@ -171,35 +182,35 @@ public class TieredMessageFetcher {
flatFile.increaseReadAheadFactor();
}
- int factor = Math.min(flatFile.getReadAheadFactor(),
storeConfig.getReadAheadMessageCountThreshold() / maxMsgNums);
+ int factor = Math.min(flatFile.getReadAheadFactor(),
storeConfig.getReadAheadMessageCountThreshold() / maxCount);
int flag = 0;
int concurrency = 1;
if (factor >
storeConfig.getReadAheadBatchSizeFactorThreshold()) {
flag = factor %
storeConfig.getReadAheadBatchSizeFactorThreshold() == 0 ? 0 : 1;
concurrency = factor /
storeConfig.getReadAheadBatchSizeFactorThreshold() + flag;
}
- int requestBatchSize = maxMsgNums * Math.min(factor,
storeConfig.getReadAheadBatchSizeFactorThreshold());
+ int requestBatchSize = maxCount * Math.min(factor,
storeConfig.getReadAheadBatchSizeFactorThreshold());
List<Pair<Integer, CompletableFuture<Long>>> futureList = new
ArrayList<>();
long nextQueueOffset = queueOffset;
if (flag == 1) {
- int firstBatchSize = factor %
storeConfig.getReadAheadBatchSizeFactorThreshold() * maxMsgNums;
- CompletableFuture<Long> future =
prefetchAndPutMsgToCache(flatFile, mq, nextQueueOffset, firstBatchSize);
+ int firstBatchSize = factor %
storeConfig.getReadAheadBatchSizeFactorThreshold() * maxCount;
+ CompletableFuture<Long> future =
prefetchMessageThenPutToCache(flatFile, mq, nextQueueOffset, firstBatchSize);
futureList.add(Pair.of(firstBatchSize, future));
nextQueueOffset += firstBatchSize;
}
for (long i = 0; i < concurrency - flag; i++) {
- CompletableFuture<Long> future =
prefetchAndPutMsgToCache(flatFile, mq, nextQueueOffset + i * requestBatchSize,
requestBatchSize);
+ CompletableFuture<Long> future =
prefetchMessageThenPutToCache(flatFile, mq, nextQueueOffset + i *
requestBatchSize, requestBatchSize);
futureList.add(Pair.of(requestBatchSize, future));
}
- flatFile.putInflightRequest(group, queueOffset, maxMsgNums *
factor, futureList);
+ flatFile.putInflightRequest(group, queueOffset, maxCount *
factor, futureList);
LOGGER.debug("TieredMessageFetcher#preFetchMessage: try to
prefetch messages for later requests: next begin offset: {}, request offset:
{}, factor: {}, flag: {}, request batch: {}, concurrency: {}",
nextBeginOffset, queueOffset, factor, flag,
requestBatchSize, concurrency);
}
}
}
- private CompletableFuture<Long>
prefetchAndPutMsgToCache(CompositeQueueFlatFile flatFile, MessageQueue mq,
+ private CompletableFuture<Long>
prefetchMessageThenPutToCache(CompositeQueueFlatFile flatFile, MessageQueue mq,
long queueOffset, int batchSize) {
return getMessageFromTieredStoreAsync(flatFile, queueOffset, batchSize)
.thenApplyAsync(result -> {
@@ -235,13 +246,14 @@ public class TieredMessageFetcher {
}, TieredStoreExecutor.fetchDataExecutor);
}
- private CompletableFuture<GetMessageResult>
getMessageFromCacheAsync(CompositeQueueFlatFile flatFile,
- String group, long queueOffset, int maxMsgNums, boolean
waitInflightRequest) {
+ public CompletableFuture<GetMessageResult>
getMessageFromCacheAsync(CompositeQueueFlatFile flatFile,
+ String group, long queueOffset, int maxCount, boolean
waitInflightRequest) {
+
MessageQueue mq = flatFile.getMessageQueue();
long lastGetOffset = queueOffset - 1;
- List<SelectMappedBufferResultWrapper> resultWrapperList = new
ArrayList<>(maxMsgNums);
- for (int i = 0; i < maxMsgNums; i++) {
+ List<SelectMappedBufferResultWrapper> resultWrapperList = new
ArrayList<>(maxCount);
+ for (int i = 0; i < maxCount; i++) {
lastGetOffset++;
SelectMappedBufferResultWrapper wrapper =
getMessageFromCache(flatFile, lastGetOffset);
if (wrapper == null) {
@@ -257,26 +269,26 @@ public class TieredMessageFetcher {
.put(TieredStoreMetricsConstant.LABEL_TOPIC, mq.getTopic())
.put(TieredStoreMetricsConstant.LABEL_GROUP, group)
.build();
- TieredStoreMetricsManager.cacheAccess.add(maxMsgNums, attributes);
+ TieredStoreMetricsManager.cacheAccess.add(maxCount, attributes);
TieredStoreMetricsManager.cacheHit.add(resultWrapperList.size(),
attributes);
}
// if no cached message found and there is currently an inflight
request, wait for the request to end before continuing
if (resultWrapperList.isEmpty() && waitInflightRequest) {
- CompletableFuture<Long> future =
flatFile.getInflightRequest(group, queueOffset, maxMsgNums)
+ CompletableFuture<Long> future =
flatFile.getInflightRequest(group, queueOffset, maxCount)
.getFuture(queueOffset);
if (!future.isDone()) {
Stopwatch stopwatch = Stopwatch.createStarted();
// to prevent starvation issues, only allow waiting for
inflight request once
return future.thenCompose(v -> {
LOGGER.debug("TieredMessageFetcher#getMessageFromCacheAsync: wait for inflight
request cost: {}ms", stopwatch.elapsed(TimeUnit.MILLISECONDS));
- return getMessageFromCacheAsync(flatFile, group,
queueOffset, maxMsgNums, false);
+ return getMessageFromCacheAsync(flatFile, group,
queueOffset, maxCount, false);
});
}
}
// try to get message from cache again when prefetch request is done
- for (int i = 0; i < maxMsgNums - resultWrapperList.size(); i++) {
+ for (int i = 0; i < maxCount - resultWrapperList.size(); i++) {
lastGetOffset++;
SelectMappedBufferResultWrapper wrapper =
getMessageFromCache(flatFile, lastGetOffset);
if (wrapper == null) {
@@ -288,11 +300,11 @@ public class TieredMessageFetcher {
recordCacheAccess(flatFile, group, queueOffset, resultWrapperList);
- // if cache is hit, result will be returned immediately and
asynchronously prefetch messages for later requests
+ // if cache hit, result will be returned immediately and
asynchronously prefetch messages for later requests
if (!resultWrapperList.isEmpty()) {
LOGGER.debug("TieredMessageFetcher#getMessageFromCacheAsync: cache
hit: topic: {}, queue: {}, queue offset: {}, max message num: {}, cache hit
num: {}",
- mq.getTopic(), mq.getQueueId(), queueOffset, maxMsgNums,
resultWrapperList.size());
- preFetchMessage(flatFile, group, maxMsgNums, lastGetOffset + 1);
+ mq.getTopic(), mq.getQueueId(), queueOffset, maxCount,
resultWrapperList.size());
+ prefetchMessage(flatFile, group, maxCount, lastGetOffset + 1);
GetMessageResult result = new GetMessageResult();
result.setStatus(GetMessageStatus.FOUND);
@@ -305,10 +317,10 @@ public class TieredMessageFetcher {
// if cache is miss, immediately pull messages
LOGGER.warn("TieredMessageFetcher#getMessageFromCacheAsync: cache
miss: topic: {}, queue: {}, queue offset: {}, max message num: {}",
- mq.getTopic(), mq.getQueueId(), queueOffset, maxMsgNums);
+ mq.getTopic(), mq.getQueueId(), queueOffset, maxCount);
CompletableFuture<GetMessageResult> resultFuture;
synchronized (flatFile) {
- int batchSize = maxMsgNums * storeConfig.getReadAheadMinFactor();
+ int batchSize = maxCount * storeConfig.getReadAheadMinFactor();
resultFuture = getMessageFromTieredStoreAsync(flatFile,
queueOffset, batchSize)
.thenApplyAsync(result -> {
if (result.getStatus() != GetMessageStatus.FOUND) {
@@ -329,8 +341,8 @@ public class TieredMessageFetcher {
SelectMappedBufferResult msg = msgList.get(i);
// put message into cache
SelectMappedBufferResultWrapper resultWrapper =
putMessageToCache(flatFile, offset, msg, minOffset, maxOffset, size, true);
- // try to meet maxMsgNums
- if (newResult.getMessageMapedList().size() <
maxMsgNums) {
+ // try to meet maxCount
+ if (newResult.getMessageMapedList().size() < maxCount)
{
newResult.addMessage(resultWrapper.getDuplicateResult(), offset);
}
}
@@ -349,6 +361,7 @@ public class TieredMessageFetcher {
public CompletableFuture<GetMessageResult>
getMessageFromTieredStoreAsync(CompositeQueueFlatFile flatFile,
long queueOffset, int batchSize) {
+
GetMessageResult result = new GetMessageResult();
result.setMinOffset(flatFile.getConsumeQueueMinOffset());
result.setMaxOffset(flatFile.getConsumeQueueCommitOffset());
@@ -361,12 +374,15 @@ public class TieredMessageFetcher {
result.setStatus(GetMessageStatus.OFFSET_OVERFLOW_ONE);
result.setNextBeginOffset(queueOffset);
return CompletableFuture.completedFuture(result);
+ case ILLEGAL_PARAM:
+ case ILLEGAL_OFFSET:
default:
result.setStatus(GetMessageStatus.OFFSET_FOUND_NULL);
result.setNextBeginOffset(queueOffset);
return CompletableFuture.completedFuture(result);
}
}
+
CompletableFuture<ByteBuffer> readCommitLogFuture =
readConsumeQueueFuture.thenComposeAsync(cqBuffer -> {
long firstCommitLogOffset =
CQItemBufferUtil.getCommitLogOffset(cqBuffer);
cqBuffer.position(cqBuffer.remaining() -
TieredConsumeQueue.CONSUME_QUEUE_STORE_UNIT_SIZE);
@@ -433,8 +449,10 @@ public class TieredMessageFetcher {
});
}
- public CompletableFuture<GetMessageResult> getMessageAsync(String group,
String topic, int queueId,
- long queueOffset, int maxMsgNums, final MessageFilter messageFilter) {
+ @Override
+ public CompletableFuture<GetMessageResult> getMessageAsync(
+ String group, String topic, int queueId, long queueOffset, int
maxCount, final MessageFilter messageFilter) {
+
CompositeQueueFlatFile flatFile = flatFileManager.getFlatFile(new
MessageQueue(topic, brokerName, queueId));
if (flatFile == null) {
GetMessageResult result = new GetMessageResult();
@@ -442,10 +460,11 @@ public class TieredMessageFetcher {
result.setStatus(GetMessageStatus.NO_MATCHED_LOGIC_QUEUE);
return CompletableFuture.completedFuture(result);
}
+
GetMessageResult result = new GetMessageResult();
long minQueueOffset = flatFile.getConsumeQueueMinOffset();
- result.setMinOffset(minQueueOffset);
long maxQueueOffset = flatFile.getConsumeQueueCommitOffset();
+ result.setMinOffset(minQueueOffset);
result.setMaxOffset(maxQueueOffset);
if (flatFile.getConsumeQueueCommitOffset() <= 0) {
@@ -468,24 +487,29 @@ public class TieredMessageFetcher {
return CompletableFuture.completedFuture(result);
}
- return getMessageFromCacheAsync(flatFile, group, queueOffset,
maxMsgNums);
+ return getMessageFromCacheAsync(flatFile, group, queueOffset,
maxCount);
}
+ @Override
public CompletableFuture<Long> getEarliestMessageTimeAsync(String topic,
int queueId) {
CompositeFlatFile flatFile = flatFileManager.getFlatFile(new
MessageQueue(topic, brokerName, queueId));
if (flatFile == null) {
return CompletableFuture.completedFuture(-1L);
}
- return flatFile.getCommitLogAsync(flatFile.getCommitLogMinOffset(),
MessageBufferUtil.STORE_TIMESTAMP_POSITION + 8)
+ // read from timestamp to timestamp + length
+ int length = MessageBufferUtil.STORE_TIMESTAMP_POSITION + 8;
+ return flatFile.getCommitLogAsync(flatFile.getCommitLogMinOffset(),
length)
.thenApply(MessageBufferUtil::getStoreTimeStamp);
}
+ @Override
public CompletableFuture<Long> getMessageStoreTimeStampAsync(String topic,
int queueId, long queueOffset) {
CompositeFlatFile flatFile = flatFileManager.getFlatFile(new
MessageQueue(topic, brokerName, queueId));
if (flatFile == null) {
return CompletableFuture.completedFuture(-1L);
}
+
return flatFile.getConsumeQueueAsync(queueOffset)
.thenComposeAsync(cqItem -> {
long commitLogOffset =
CQItemBufferUtil.getCommitLogOffset(cqItem);
@@ -494,27 +518,33 @@ public class TieredMessageFetcher {
}, TieredStoreExecutor.fetchDataExecutor)
.thenApply(MessageBufferUtil::getStoreTimeStamp)
.exceptionally(e -> {
-
LOGGER.error("TieredMessageFetcher#getMessageStoreTimeStampAsync: get or decode
message failed: topic: {}, queue: {}, offset: {}", topic, queueId, queueOffset,
e);
+
LOGGER.error("TieredMessageFetcher#getMessageStoreTimeStampAsync: " +
+ "get or decode message failed: topic: {}, queue: {},
offset: {}", topic, queueId, queueOffset, e);
return -1L;
});
}
- public long getOffsetInQueueByTime(String topic, int queueId, long
timestamp,
- BoundaryType type) {
+ @Override
+ public long getOffsetInQueueByTime(String topic, int queueId, long
timestamp, BoundaryType type) {
CompositeFlatFile flatFile = flatFileManager.getFlatFile(new
MessageQueue(topic, brokerName, queueId));
if (flatFile == null) {
return -1L;
}
+
try {
return flatFile.getOffsetInConsumeQueueByTime(timestamp, type);
} catch (Exception e) {
- LOGGER.error("TieredMessageFetcher#getOffsetInQueueByTime: get
offset in queue by time failed: topic: {}, queue: {}, timestamp: {}, type: {}",
topic, queueId, timestamp, type, e);
+ LOGGER.error("TieredMessageFetcher#getOffsetInQueueByTime: " +
+ "get offset in queue by time failed: topic: {}, queue: {},
timestamp: {}, type: {}",
+ topic, queueId, timestamp, type, e);
}
return -1L;
}
- public CompletableFuture<QueryMessageResult> queryMessageAsync(String
topic, String key, int maxNum, long begin,
- long end) {
+ @Override
+ public CompletableFuture<QueryMessageResult> queryMessageAsync(
+ String topic, String key, int maxCount, long begin, long end) {
+
TieredIndexFile indexFile =
TieredFlatFileManager.getIndexFile(storeConfig);
int hashCode =
TieredIndexFile.indexKeyHashMethod(TieredIndexFile.buildKey(topic, key));
@@ -522,12 +552,12 @@ public class TieredMessageFetcher {
try {
TopicMetadata topicMetadata = metadataStore.getTopic(topic);
if (topicMetadata == null) {
- LOGGER.info("TieredMessageFetcher#queryMessageAsync: get topic
id from metadata failed, topic metadata not found: topic: {}", topic);
+ LOGGER.info("TieredMessageFetcher#queryMessageAsync, topic
metadata not found, topic: {}", topic);
return CompletableFuture.completedFuture(new
QueryMessageResult());
}
topicId = topicMetadata.getTopicId();
} catch (Exception e) {
- LOGGER.error("TieredMessageFetcher#queryMessageAsync: get topic id
from metadata failed: topic: {}", topic, e);
+ LOGGER.error("TieredMessageFetcher#queryMessageAsync, get topic id
failed, topic: {}", topic, e);
return CompletableFuture.completedFuture(new QueryMessageResult());
}
@@ -535,15 +565,22 @@ public class TieredMessageFetcher {
.thenCompose(indexBufferList -> {
QueryMessageResult result = new QueryMessageResult();
int resultCount = 0;
- List<CompletableFuture<Void>> futureList = new
ArrayList<>(maxNum);
+ List<CompletableFuture<Void>> futureList = new
ArrayList<>(maxCount);
for (Pair<Long, ByteBuffer> pair : indexBufferList) {
Long fileBeginTimestamp = pair.getKey();
ByteBuffer indexBuffer = pair.getValue();
+
if (indexBuffer.remaining() %
TieredIndexFile.INDEX_FILE_HASH_COMPACT_INDEX_SIZE != 0) {
-
LOGGER.error("[Bug]TieredMessageFetcher#queryMessageAsync: index buffer size {}
is not multiple of index item size {}", indexBuffer.remaining(),
TieredIndexFile.INDEX_FILE_HASH_COMPACT_INDEX_SIZE);
+ LOGGER.error("[Bug]
TieredMessageFetcher#queryMessageAsync: " +
+ "index buffer size {} is not multiple of index
item size {}",
+ indexBuffer.remaining(),
TieredIndexFile.INDEX_FILE_HASH_COMPACT_INDEX_SIZE);
continue;
}
- for (int indexOffset = indexBuffer.position(); indexOffset
< indexBuffer.limit(); indexOffset +=
TieredIndexFile.INDEX_FILE_HASH_COMPACT_INDEX_SIZE) {
+
+ for (int indexOffset = indexBuffer.position();
+ indexOffset < indexBuffer.limit();
+ indexOffset +=
TieredIndexFile.INDEX_FILE_HASH_COMPACT_INDEX_SIZE) {
+
int indexItemHashCode =
indexBuffer.getInt(indexOffset);
if (indexItemHashCode != hashCode) {
continue;
@@ -555,11 +592,13 @@ public class TieredMessageFetcher {
}
int queueId = indexBuffer.getInt(indexOffset + 4 + 4);
- CompositeFlatFile flatFile =
TieredFlatFileManager.getInstance(storeConfig).getFlatFile(new
MessageQueue(topic, brokerName, queueId));
+ CompositeFlatFile flatFile =
+ flatFileManager.getFlatFile(new
MessageQueue(topic, brokerName, queueId));
if (flatFile == null) {
continue;
}
+ // decode index item
long offset = indexBuffer.getLong(indexOffset + 4 + 4
+ 4);
int size = indexBuffer.getInt(indexOffset + 4 + 4 + 4
+ 8);
int timeDiff = indexBuffer.getInt(indexOffset + 4 + 4
+ 4 + 8 + 4);
@@ -567,16 +606,19 @@ public class TieredMessageFetcher {
if (indexTimestamp < begin || indexTimestamp > end) {
continue;
}
+
CompletableFuture<Void> getMessageFuture =
flatFile.getCommitLogAsync(offset, size)
- .thenAccept(messageBuffer -> result.addMessage(new
SelectMappedBufferResult(0, messageBuffer, size, null)));
+ .thenAccept(messageBuffer -> result.addMessage(
+ new SelectMappedBufferResult(0, messageBuffer,
size, null)));
futureList.add(getMessageFuture);
resultCount++;
- if (resultCount >= maxNum) {
+ if (resultCount >= maxCount) {
break;
}
}
- if (resultCount >= maxNum) {
+
+ if (resultCount >= maxCount) {
break;
}
}
diff --git
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredFlatFile.java
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredFlatFile.java
index 67b32c3a7..a71323348 100644
---
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredFlatFile.java
+++
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredFlatFile.java
@@ -493,16 +493,16 @@ public class TieredFlatFile {
fileSegment.destroyFile();
if (!fileSegment.exists()) {
tieredMetadataStore.deleteFileSegment(filePath,
fileType, metadata.getBaseOffset());
- logger.info("expired file {} is been destroyed",
fileSegment.getPath());
+ logger.info("Destroyed expired file, file path:
{}", fileSegment.getPath());
}
} catch (Exception e) {
- logger.error("destroy expired failed: file path: {},
file type: {}",
+ logger.error("Destroyed expired file failed, file
path: {}, file type: {}",
filePath, fileType, e);
}
}
});
} catch (Exception e) {
- logger.error("destroy expired file failed: file path: {}, file
type: {}", filePath, fileType);
+ logger.error("Destroyed expired file, file path: {}, file type:
{}", filePath, fileType);
}
}
@@ -520,7 +520,7 @@ public class TieredFlatFile {
this.updateFileSegment(segment);
} catch (Exception e) {
// TODO handle update segment metadata failed
exception
- logger.error("update file segment metadata failed:
" +
+ logger.error("Update file segment metadata failed:
" +
"file path: {}, file type: {}, base
offset: {}",
filePath, fileType, segment.getBaseOffset(),
e);
}
@@ -531,7 +531,7 @@ public class TieredFlatFile {
);
}
} catch (Exception e) {
- logger.error("commit file segment failed: topic: {}, queue: {},
file type: {}", filePath, fileType, e);
+ logger.error("Commit file segment failed: topic: {}, queue: {},
file type: {}", filePath, fileType, e);
}
if (sync) {
CompletableFuture.allOf(futureList.toArray(new
CompletableFuture[0])).join();
diff --git
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredIndexFile.java
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredIndexFile.java
index 0acf4b197..50beb01ae 100644
---
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredIndexFile.java
+++
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredIndexFile.java
@@ -44,18 +44,21 @@ public class TieredIndexFile {
private static final Logger logger =
LoggerFactory.getLogger(TieredStoreUtil.TIERED_STORE_LOGGER_NAME);
- public static final int INDEX_FILE_BEGIN_MAGIC_CODE = 0xCCDDEEFF ^
1880681586 + 4;
- public static final int INDEX_FILE_END_MAGIC_CODE = 0xCCDDEEFF ^
1880681586 + 8;
- public static final int INDEX_FILE_HEADER_SIZE = 28;
- public static final int INDEX_FILE_HASH_SLOT_SIZE = 8;
- public static final int INDEX_FILE_HASH_ORIGIN_INDEX_SIZE = 32;
- public static final int INDEX_FILE_HASH_COMPACT_INDEX_SIZE = 28;
-
+ // header format:
+ // magic code(4) + begin timestamp(8) + end timestamp(8) + slot num(4) +
index num(4)
public static final int INDEX_FILE_HEADER_MAGIC_CODE_POSITION = 0;
public static final int INDEX_FILE_HEADER_BEGIN_TIME_STAMP_POSITION = 4;
public static final int INDEX_FILE_HEADER_END_TIME_STAMP_POSITION = 12;
public static final int INDEX_FILE_HEADER_SLOT_NUM_POSITION = 20;
public static final int INDEX_FILE_HEADER_INDEX_NUM_POSITION = 24;
+ public static final int INDEX_FILE_HEADER_SIZE = 28;
+
+ // index item
+ public static final int INDEX_FILE_BEGIN_MAGIC_CODE = 0xCCDDEEFF ^
1880681586 + 4;
+ public static final int INDEX_FILE_END_MAGIC_CODE = 0xCCDDEEFF ^
1880681586 + 8;
+ public static final int INDEX_FILE_HASH_SLOT_SIZE = 8;
+ public static final int INDEX_FILE_HASH_ORIGIN_INDEX_SIZE = 32;
+ public static final int INDEX_FILE_HASH_COMPACT_INDEX_SIZE = 28;
private static final String INDEX_FILE_DIR_NAME = "tiered_index_file";
private static final String CUR_INDEX_FILE_NAME = "0000";
diff --git
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/metrics/TieredStoreMetricsManager.java
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/metrics/TieredStoreMetricsManager.java
index 60f3b1468..3ca0fb614 100644
---
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/metrics/TieredStoreMetricsManager.java
+++
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/metrics/TieredStoreMetricsManager.java
@@ -259,14 +259,14 @@ public class TieredStoreMetricsManager {
cacheCount = meter.gaugeBuilder(GAUGE_CACHE_COUNT)
.setDescription("Tiered store cache message count")
.ofLongs()
- .buildWithCallback(measurement ->
measurement.record(fetcher.getReadAheadCache().estimatedSize(),
newAttributesBuilder().build()));
+ .buildWithCallback(measurement ->
measurement.record(fetcher.getMessageCache().estimatedSize(),
newAttributesBuilder().build()));
cacheBytes = meter.gaugeBuilder(GAUGE_CACHE_BYTES)
.setDescription("Tiered store cache message bytes")
.setUnit("bytes")
.ofLongs()
.buildWithCallback(measurement -> {
- Optional<Policy.Eviction<MessageCacheKey,
SelectMappedBufferResultWrapper>> eviction =
fetcher.getReadAheadCache().policy().eviction();
+ Optional<Policy.Eviction<MessageCacheKey,
SelectMappedBufferResultWrapper>> eviction =
fetcher.getMessageCache().policy().eviction();
eviction.ifPresent(resultEviction ->
measurement.record(resultEviction.weightedSize().orElse(0),
newAttributesBuilder().build()));
});
diff --git
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/inputstream/TieredCommitLogInputStream.java
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/inputstream/TieredCommitLogInputStream.java
index c988d42fa..c70bb7656 100644
---
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/inputstream/TieredCommitLogInputStream.java
+++
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/inputstream/TieredCommitLogInputStream.java
@@ -78,7 +78,8 @@ public class TieredCommitLogInputStream extends
TieredFileSegmentInputStream {
commitLogOffset += readPosInCurBuffer;
readPosInCurBuffer = 0;
}
- if (readPosInCurBuffer >= MessageBufferUtil.PHYSICAL_OFFSET_POSITION
&& readPosInCurBuffer < MessageBufferUtil.SYS_FLAG_OFFSET_POSITION) {
+ if (readPosInCurBuffer >= MessageBufferUtil.PHYSICAL_OFFSET_POSITION
+ && readPosInCurBuffer <
MessageBufferUtil.SYS_FLAG_OFFSET_POSITION) {
res = (int) ((commitLogOffset >> (8 *
(MessageBufferUtil.SYS_FLAG_OFFSET_POSITION - readPosInCurBuffer - 1))) & 0xff);
readPosInCurBuffer++;
} else {
diff --git
a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredMessageFetcherTest.java
b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredMessageFetcherTest.java
index 209afbbfc..df3720bab 100644
---
a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredMessageFetcherTest.java
+++
b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredMessageFetcherTest.java
@@ -19,6 +19,7 @@ package org.apache.rocketmq.tieredstore;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
+import java.util.Objects;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.SystemUtils;
import org.apache.commons.lang3.tuple.Triple;
@@ -141,9 +142,9 @@ public class TieredMessageFetcherTest {
Assert.assertNotNull(flatFile);
fetcher.recordCacheAccess(flatFile, "prevent-invalid-cache", 0, new
ArrayList<>());
- Assert.assertEquals(0, fetcher.readAheadCache.estimatedSize());
+ Assert.assertEquals(0, fetcher.getMessageCache().estimatedSize());
fetcher.putMessageToCache(flatFile, 0, new SelectMappedBufferResult(0,
msg1, msg1.remaining(), null), 0, 0, 1);
- Assert.assertEquals(1, fetcher.readAheadCache.estimatedSize());
+ Assert.assertEquals(1, fetcher.getMessageCache().estimatedSize());
GetMessageResult getMessageResult =
fetcher.getMessageFromCacheAsync(flatFile, "group", 0, 32).join();
Assert.assertEquals(GetMessageStatus.FOUND,
getMessageResult.getStatus());
@@ -151,21 +152,22 @@ public class TieredMessageFetcherTest {
Assert.assertEquals(msg1,
getMessageResult.getMessageBufferList().get(0));
Awaitility.waitAtMost(3, TimeUnit.SECONDS)
- .until(() -> fetcher.readAheadCache.estimatedSize() == 2);
+ .until(() -> fetcher.getMessageCache().estimatedSize() == 2);
ArrayList<SelectMappedBufferResultWrapper> wrapperList = new
ArrayList<>();
wrapperList.add(fetcher.getMessageFromCache(flatFile, 0));
fetcher.recordCacheAccess(flatFile, "prevent-invalid-cache", 0,
wrapperList);
- Assert.assertEquals(1, fetcher.readAheadCache.estimatedSize());
+ Assert.assertEquals(1, fetcher.getMessageCache().estimatedSize());
wrapperList.clear();
wrapperList.add(fetcher.getMessageFromCache(flatFile, 1));
fetcher.recordCacheAccess(flatFile, "prevent-invalid-cache", 0,
wrapperList);
- Assert.assertEquals(1, fetcher.readAheadCache.estimatedSize());
+ Assert.assertEquals(1, fetcher.getMessageCache().estimatedSize());
- SelectMappedBufferResult messageFromCache =
fetcher.getMessageFromCache(flatFile, 1).getDuplicateResult();
+ SelectMappedBufferResult messageFromCache =
+ Objects.requireNonNull(fetcher.getMessageFromCache(flatFile,
1)).getDuplicateResult();
fetcher.recordCacheAccess(flatFile, "group", 0, wrapperList);
Assert.assertNotNull(messageFromCache);
Assert.assertEquals(msg2, messageFromCache.getByteBuffer());
- Assert.assertEquals(0, fetcher.readAheadCache.estimatedSize());
+ Assert.assertEquals(0, fetcher.getMessageCache().estimatedSize());
}
@Test