This is an automated email from the ASF dual-hosted git repository.
lizhimin pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 093cb843ea [ISSUE #7974] Add repeatedly read same offset log to find
unexpected situations (#7975)
093cb843ea is described below
commit 093cb843eaf661f99762df241e470f6e8433d7ee
Author: lizhimins <[email protected]>
AuthorDate: Wed Mar 27 18:51:01 2024 +0800
[ISSUE #7974] Add repeatedly read same offset log to find unexpected
situations (#7975)
---
.../rocketmq/tieredstore/common/SelectBufferResult.java | 7 +++++++
.../tieredstore/core/MessageStoreFetcherImpl.java | 15 +++++++++++++--
2 files changed, 20 insertions(+), 2 deletions(-)
diff --git
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/common/SelectBufferResult.java
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/common/SelectBufferResult.java
index d265ed0fc4..cad37c7bc4 100644
---
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/common/SelectBufferResult.java
+++
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/common/SelectBufferResult.java
@@ -18,6 +18,7 @@
package org.apache.rocketmq.tieredstore.common;
import java.nio.ByteBuffer;
+import java.util.concurrent.atomic.AtomicLong;
public class SelectBufferResult {
@@ -25,12 +26,14 @@ public class SelectBufferResult {
private final long startOffset;
private final int size;
private final long tagCode;
+ private final AtomicLong accessCount;
public SelectBufferResult(ByteBuffer byteBuffer, long startOffset, int
size, long tagCode) {
this.startOffset = startOffset;
this.byteBuffer = byteBuffer;
this.size = size;
this.tagCode = tagCode;
+ this.accessCount = new AtomicLong();
}
public ByteBuffer getByteBuffer() {
@@ -48,4 +51,8 @@ public class SelectBufferResult {
public long getTagCode() {
return tagCode;
}
+
+ public AtomicLong getAccessCount() {
+ return accessCount;
+ }
}
diff --git
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/core/MessageStoreFetcherImpl.java
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/core/MessageStoreFetcherImpl.java
index 5403ebdc31..4ecf79658e 100644
---
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/core/MessageStoreFetcherImpl.java
+++
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/core/MessageStoreFetcherImpl.java
@@ -76,7 +76,10 @@ public class MessageStoreFetcherImpl implements
MessageStoreFetcher {
return Caffeine.newBuilder()
.scheduler(Scheduler.systemScheduler())
- .expireAfterWrite(storeConfig.getReadAheadCacheExpireDuration(),
TimeUnit.MILLISECONDS)
+ // Clients may repeatedly request messages at the same offset in
tiered storage,
+ // causing the request queue to become full. Using expire after
read or write policy
+ // to refresh the cache expiration time.
+ .expireAfterAccess(storeConfig.getReadAheadCacheExpireDuration(),
TimeUnit.MILLISECONDS)
.maximumWeight(memoryMaxSize)
// Using the buffer size of messages to calculate memory usage
.weigher((String key, SelectBufferResult buffer) ->
buffer.getSize())
@@ -98,7 +101,15 @@ public class MessageStoreFetcherImpl implements
MessageStoreFetcher {
SelectBufferResult buffer = this.fetcherCache.getIfPresent(
String.format(CACHE_KEY_FORMAT, mq.getTopic(), mq.getQueueId(),
offset));
// return duplicate buffer here
- return buffer == null ? null : new SelectBufferResult(
+ if (buffer == null) {
+ return null;
+ }
+ long count = buffer.getAccessCount().incrementAndGet();
+ if (count % 1000L == 0L) {
+ log.warn("MessageFetcher fetch same offset message too many times,
" +
+ "topic={}, queueId={}, offset={}, count={}", mq.getTopic(),
mq.getQueueId(), offset, count);
+ }
+ return new SelectBufferResult(
buffer.getByteBuffer().asReadOnlyBuffer(),
buffer.getStartOffset(), buffer.getSize(), buffer.getTagCode());
}