This is an automated email from the ASF dual-hosted git repository.

lizhimin pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 093cb843ea [ISSUE #7974] Add repeatedly read same offset log to find 
unexpected situations (#7975)
093cb843ea is described below

commit 093cb843eaf661f99762df241e470f6e8433d7ee
Author: lizhimins <[email protected]>
AuthorDate: Wed Mar 27 18:51:01 2024 +0800

    [ISSUE #7974] Add repeatedly read same offset log to find unexpected 
situations (#7975)
---
 .../rocketmq/tieredstore/common/SelectBufferResult.java   |  7 +++++++
 .../tieredstore/core/MessageStoreFetcherImpl.java         | 15 +++++++++++++--
 2 files changed, 20 insertions(+), 2 deletions(-)

diff --git 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/common/SelectBufferResult.java
 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/common/SelectBufferResult.java
index d265ed0fc4..cad37c7bc4 100644
--- 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/common/SelectBufferResult.java
+++ 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/common/SelectBufferResult.java
@@ -18,6 +18,7 @@
 package org.apache.rocketmq.tieredstore.common;
 
 import java.nio.ByteBuffer;
+import java.util.concurrent.atomic.AtomicLong;
 
 public class SelectBufferResult {
 
@@ -25,12 +26,14 @@ public class SelectBufferResult {
     private final long startOffset;
     private final int size;
     private final long tagCode;
+    private final AtomicLong accessCount;
 
     public SelectBufferResult(ByteBuffer byteBuffer, long startOffset, int 
size, long tagCode) {
         this.startOffset = startOffset;
         this.byteBuffer = byteBuffer;
         this.size = size;
         this.tagCode = tagCode;
+        this.accessCount = new AtomicLong();
     }
 
     public ByteBuffer getByteBuffer() {
@@ -48,4 +51,8 @@ public class SelectBufferResult {
     public long getTagCode() {
         return tagCode;
     }
+
+    public AtomicLong getAccessCount() {
+        return accessCount;
+    }
 }
diff --git 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/core/MessageStoreFetcherImpl.java
 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/core/MessageStoreFetcherImpl.java
index 5403ebdc31..4ecf79658e 100644
--- 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/core/MessageStoreFetcherImpl.java
+++ 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/core/MessageStoreFetcherImpl.java
@@ -76,7 +76,10 @@ public class MessageStoreFetcherImpl implements 
MessageStoreFetcher {
 
         return Caffeine.newBuilder()
             .scheduler(Scheduler.systemScheduler())
-            .expireAfterWrite(storeConfig.getReadAheadCacheExpireDuration(), 
TimeUnit.MILLISECONDS)
+            // Clients may repeatedly request messages at the same offset in 
tiered storage,
+            // causing the request queue to become full. Using expire after 
read or write policy
+            // to refresh the cache expiration time.
+            .expireAfterAccess(storeConfig.getReadAheadCacheExpireDuration(), 
TimeUnit.MILLISECONDS)
             .maximumWeight(memoryMaxSize)
             // Using the buffer size of messages to calculate memory usage
             .weigher((String key, SelectBufferResult buffer) -> 
buffer.getSize())
@@ -98,7 +101,15 @@ public class MessageStoreFetcherImpl implements 
MessageStoreFetcher {
         SelectBufferResult buffer = this.fetcherCache.getIfPresent(
             String.format(CACHE_KEY_FORMAT, mq.getTopic(), mq.getQueueId(), 
offset));
         // return duplicate buffer here
-        return buffer == null ? null : new SelectBufferResult(
+        if (buffer == null) {
+            return null;
+        }
+        long count = buffer.getAccessCount().incrementAndGet();
+        if (count % 1000L == 0L) {
+            log.warn("MessageFetcher fetch same offset message too many times, 
" +
+                "topic={}, queueId={}, offset={}, count={}", mq.getTopic(), 
mq.getQueueId(), offset, count);
+        }
+        return new SelectBufferResult(
             buffer.getByteBuffer().asReadOnlyBuffer(), 
buffer.getStartOffset(), buffer.getSize(), buffer.getTagCode());
     }
 

Reply via email to