This is an automated email from the ASF dual-hosted git repository.

lollipop pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 831fcc76cd [ISSUE #7363] Fix get message from tiered storage return 
incorrect next pull offset (#7365)
831fcc76cd is described below

commit 831fcc76cd7cd362bb6c136c287c624bb7eaf40a
Author: lizhimins <[email protected]>
AuthorDate: Tue Sep 19 10:04:04 2023 +0800

    [ISSUE #7363] Fix get message from tiered storage return incorrect next 
pull offset (#7365)
---
 .../rocketmq/tieredstore/TieredMessageFetcher.java |  2 +-
 .../rocketmq/tieredstore/TieredMessageStore.java   | 29 ++++++++++++----------
 .../tieredstore/TieredMessageStoreTest.java        |  5 ++--
 3 files changed, 20 insertions(+), 16 deletions(-)

diff --git 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageFetcher.java
 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageFetcher.java
index 766ff64f6c..c948fa3fa1 100644
--- 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageFetcher.java
+++ 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageFetcher.java
@@ -319,7 +319,7 @@ public class TieredMessageFetcher implements 
MessageStoreFetcher {
         }
 
         // if cache is miss, immediately pull messages
-        LOGGER.warn("TieredMessageFetcher#getMessageFromCacheAsync: cache 
miss: " +
+        LOGGER.info("TieredMessageFetcher#getMessageFromCacheAsync: cache 
miss: " +
                 "topic: {}, queue: {}, queue offset: {}, max message num: {}",
             mq.getTopic(), mq.getQueueId(), queueOffset, maxCount);
 
diff --git 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java
 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java
index 9fb1b2f01c..d7d13d61e2 100644
--- 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java
+++ 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java
@@ -147,6 +147,11 @@ public class TieredMessageStore extends 
AbstractPluginMessageStore {
     public CompletableFuture<GetMessageResult> getMessageAsync(String group, 
String topic,
         int queueId, long offset, int maxMsgNums, MessageFilter messageFilter) 
{
 
+        // For system topic, force reading from local store
+        if (TieredStoreUtil.isSystemTopic(topic) || 
PopAckConstants.isStartWithRevivePrefix(topic)) {
+            return next.getMessageAsync(group, topic, queueId, offset, 
maxMsgNums, messageFilter);
+        }
+
         if (fetchFromCurrentStore(topic, queueId, offset, maxMsgNums)) {
             logger.trace("GetMessageAsync from current store, topic: {}, 
queue: {}, offset: {}", topic, queueId, offset);
         } else {
@@ -158,6 +163,7 @@ public class TieredMessageStore extends 
AbstractPluginMessageStore {
         return fetcher
             .getMessageAsync(group, topic, queueId, offset, maxMsgNums, 
messageFilter)
             .thenApply(result -> {
+
                 Attributes latencyAttributes = 
TieredStoreMetricsManager.newAttributesBuilder()
                     .put(TieredStoreMetricsConstant.LABEL_OPERATION, 
TieredStoreMetricsConstant.OPERATION_API_GET_MESSAGE)
                     .put(TieredStoreMetricsConstant.LABEL_TOPIC, topic)
@@ -166,8 +172,7 @@ public class TieredMessageStore extends 
AbstractPluginMessageStore {
                 
TieredStoreMetricsManager.apiLatency.record(stopwatch.elapsed(TimeUnit.MILLISECONDS),
 latencyAttributes);
 
                 if (result.getStatus() == GetMessageStatus.OFFSET_FOUND_NULL ||
-                    result.getStatus() == GetMessageStatus.OFFSET_OVERFLOW_ONE 
||
-                    result.getStatus() == 
GetMessageStatus.OFFSET_OVERFLOW_BADLY) {
+                    result.getStatus() == 
GetMessageStatus.NO_MATCHED_LOGIC_QUEUE) {
 
                     if (next.checkInStoreByConsumeOffset(topic, queueId, 
offset)) {
                         TieredStoreMetricsManager.fallbackTotal.add(1, 
latencyAttributes);
@@ -178,14 +183,8 @@ public class TieredMessageStore extends 
AbstractPluginMessageStore {
                     }
                 }
 
-                // Fetch system topic data from the broker when using the 
force level.
-                if (result.getStatus() == 
GetMessageStatus.NO_MATCHED_LOGIC_QUEUE) {
-                    if (TieredStoreUtil.isSystemTopic(topic) || 
PopAckConstants.isStartWithRevivePrefix(topic)) {
-                        return next.getMessage(group, topic, queueId, offset, 
maxMsgNums, messageFilter);
-                    }
-                }
-
                 if (result.getStatus() != GetMessageStatus.FOUND &&
+                    result.getStatus() != 
GetMessageStatus.NO_MATCHED_LOGIC_QUEUE &&
                     result.getStatus() != GetMessageStatus.OFFSET_OVERFLOW_ONE 
&&
                     result.getStatus() != 
GetMessageStatus.OFFSET_OVERFLOW_BADLY) {
                     logger.warn("GetMessageAsync not found and message is not 
in next store, result: {}, " +
@@ -206,10 +205,14 @@ public class TieredMessageStore extends 
AbstractPluginMessageStore {
                 if (minOffsetInQueue >= 0 && minOffsetInQueue < 
result.getMinOffset()) {
                     result.setMinOffset(minOffsetInQueue);
                 }
-                long maxOffsetInQueue = next.getMaxOffsetInQueue(topic, 
queueId);
-                if (maxOffsetInQueue >= 0 && maxOffsetInQueue > 
result.getMaxOffset()) {
-                    result.setMaxOffset(maxOffsetInQueue);
-                }
+
+                // In general, the local cq offset is slightly greater than 
the commit offset in read message,
+                // so there is no need to update the maximum offset to the 
local cq offset here,
+                // otherwise it will cause repeated consumption after next 
begin offset over commit offset.
+
+                logger.trace("GetMessageAsync result, group: {}, topic: {}, 
queueId: {}, offset: {}, count:{}, {}",
+                    group, topic, queueId, offset, maxMsgNums, result);
+
                 return result;
             }).exceptionally(e -> {
                 logger.error("GetMessageAsync from tiered store failed", e);
diff --git 
a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredMessageStoreTest.java
 
b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredMessageStoreTest.java
index 2451199c28..07af1fc8b1 100644
--- 
a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredMessageStoreTest.java
+++ 
b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredMessageStoreTest.java
@@ -168,7 +168,7 @@ public class TieredMessageStoreTest {
         GetMessageResult result1 = new GetMessageResult();
         result1.setStatus(GetMessageStatus.FOUND);
         GetMessageResult result2 = new GetMessageResult();
-        result2.setStatus(GetMessageStatus.MESSAGE_WAS_REMOVING);
+        result2.setStatus(GetMessageStatus.OFFSET_OVERFLOW_BADLY);
 
         when(fetcher.getMessageAsync(anyString(), anyString(), anyInt(), 
anyLong(), anyInt(), 
any())).thenReturn(CompletableFuture.completedFuture(result1));
         when(nextStore.getMessage(anyString(), anyString(), anyInt(), 
anyLong(), anyInt(), any())).thenReturn(result2);
@@ -188,7 +188,8 @@ public class TieredMessageStoreTest {
         properties.setProperty("tieredStorageLevel", "3");
         configuration.update(properties);
         when(nextStore.checkInStoreByConsumeOffset(anyString(), anyInt(), 
anyLong())).thenReturn(true);
-        Assert.assertSame(result2, store.getMessage("group", mq.getTopic(), 
mq.getQueueId(), 0, 0, null));
+        Assert.assertEquals(result2.getStatus(),
+            store.getMessage("group", mq.getTopic(), mq.getQueueId(), 0, 0, 
null).getStatus());
     }
 
     @Test

Reply via email to