This is an automated email from the ASF dual-hosted git repository.
lollipop pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 831fcc76cd [ISSUE #7363] Fix get message from tiered storage return
incorrect next pull offset (#7365)
831fcc76cd is described below
commit 831fcc76cd7cd362bb6c136c287c624bb7eaf40a
Author: lizhimins <[email protected]>
AuthorDate: Tue Sep 19 10:04:04 2023 +0800
[ISSUE #7363] Fix get message from tiered storage return incorrect next
pull offset (#7365)
---
.../rocketmq/tieredstore/TieredMessageFetcher.java | 2 +-
.../rocketmq/tieredstore/TieredMessageStore.java | 29 ++++++++++++----------
.../tieredstore/TieredMessageStoreTest.java | 5 ++--
3 files changed, 20 insertions(+), 16 deletions(-)
diff --git
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageFetcher.java
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageFetcher.java
index 766ff64f6c..c948fa3fa1 100644
---
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageFetcher.java
+++
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageFetcher.java
@@ -319,7 +319,7 @@ public class TieredMessageFetcher implements
MessageStoreFetcher {
}
// if cache is miss, immediately pull messages
- LOGGER.warn("TieredMessageFetcher#getMessageFromCacheAsync: cache
miss: " +
+ LOGGER.info("TieredMessageFetcher#getMessageFromCacheAsync: cache
miss: " +
"topic: {}, queue: {}, queue offset: {}, max message num: {}",
mq.getTopic(), mq.getQueueId(), queueOffset, maxCount);
diff --git
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java
index 9fb1b2f01c..d7d13d61e2 100644
---
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java
+++
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java
@@ -147,6 +147,11 @@ public class TieredMessageStore extends
AbstractPluginMessageStore {
public CompletableFuture<GetMessageResult> getMessageAsync(String group,
String topic,
int queueId, long offset, int maxMsgNums, MessageFilter messageFilter)
{
+ // For system topic, force reading from local store
+ if (TieredStoreUtil.isSystemTopic(topic) ||
PopAckConstants.isStartWithRevivePrefix(topic)) {
+ return next.getMessageAsync(group, topic, queueId, offset,
maxMsgNums, messageFilter);
+ }
+
if (fetchFromCurrentStore(topic, queueId, offset, maxMsgNums)) {
logger.trace("GetMessageAsync from current store, topic: {},
queue: {}, offset: {}", topic, queueId, offset);
} else {
@@ -158,6 +163,7 @@ public class TieredMessageStore extends
AbstractPluginMessageStore {
return fetcher
.getMessageAsync(group, topic, queueId, offset, maxMsgNums,
messageFilter)
.thenApply(result -> {
+
Attributes latencyAttributes =
TieredStoreMetricsManager.newAttributesBuilder()
.put(TieredStoreMetricsConstant.LABEL_OPERATION,
TieredStoreMetricsConstant.OPERATION_API_GET_MESSAGE)
.put(TieredStoreMetricsConstant.LABEL_TOPIC, topic)
@@ -166,8 +172,7 @@ public class TieredMessageStore extends
AbstractPluginMessageStore {
TieredStoreMetricsManager.apiLatency.record(stopwatch.elapsed(TimeUnit.MILLISECONDS),
latencyAttributes);
if (result.getStatus() == GetMessageStatus.OFFSET_FOUND_NULL ||
- result.getStatus() == GetMessageStatus.OFFSET_OVERFLOW_ONE
||
- result.getStatus() ==
GetMessageStatus.OFFSET_OVERFLOW_BADLY) {
+ result.getStatus() ==
GetMessageStatus.NO_MATCHED_LOGIC_QUEUE) {
if (next.checkInStoreByConsumeOffset(topic, queueId,
offset)) {
TieredStoreMetricsManager.fallbackTotal.add(1,
latencyAttributes);
@@ -178,14 +183,8 @@ public class TieredMessageStore extends
AbstractPluginMessageStore {
}
}
- // Fetch system topic data from the broker when using the
force level.
- if (result.getStatus() ==
GetMessageStatus.NO_MATCHED_LOGIC_QUEUE) {
- if (TieredStoreUtil.isSystemTopic(topic) ||
PopAckConstants.isStartWithRevivePrefix(topic)) {
- return next.getMessage(group, topic, queueId, offset,
maxMsgNums, messageFilter);
- }
- }
-
if (result.getStatus() != GetMessageStatus.FOUND &&
+ result.getStatus() !=
GetMessageStatus.NO_MATCHED_LOGIC_QUEUE &&
result.getStatus() != GetMessageStatus.OFFSET_OVERFLOW_ONE
&&
result.getStatus() !=
GetMessageStatus.OFFSET_OVERFLOW_BADLY) {
logger.warn("GetMessageAsync not found and message is not
in next store, result: {}, " +
@@ -206,10 +205,14 @@ public class TieredMessageStore extends
AbstractPluginMessageStore {
if (minOffsetInQueue >= 0 && minOffsetInQueue <
result.getMinOffset()) {
result.setMinOffset(minOffsetInQueue);
}
- long maxOffsetInQueue = next.getMaxOffsetInQueue(topic,
queueId);
- if (maxOffsetInQueue >= 0 && maxOffsetInQueue >
result.getMaxOffset()) {
- result.setMaxOffset(maxOffsetInQueue);
- }
+
+ // In general, the local cq offset is slightly greater than
the commit offset in read message,
+ // so there is no need to update the maximum offset to the
local cq offset here,
+ // otherwise it will cause repeated consumption after next
begin offset over commit offset.
+
+ logger.trace("GetMessageAsync result, group: {}, topic: {},
queueId: {}, offset: {}, count:{}, {}",
+ group, topic, queueId, offset, maxMsgNums, result);
+
return result;
}).exceptionally(e -> {
logger.error("GetMessageAsync from tiered store failed", e);
diff --git
a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredMessageStoreTest.java
b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredMessageStoreTest.java
index 2451199c28..07af1fc8b1 100644
---
a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredMessageStoreTest.java
+++
b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredMessageStoreTest.java
@@ -168,7 +168,7 @@ public class TieredMessageStoreTest {
GetMessageResult result1 = new GetMessageResult();
result1.setStatus(GetMessageStatus.FOUND);
GetMessageResult result2 = new GetMessageResult();
- result2.setStatus(GetMessageStatus.MESSAGE_WAS_REMOVING);
+ result2.setStatus(GetMessageStatus.OFFSET_OVERFLOW_BADLY);
when(fetcher.getMessageAsync(anyString(), anyString(), anyInt(),
anyLong(), anyInt(),
any())).thenReturn(CompletableFuture.completedFuture(result1));
when(nextStore.getMessage(anyString(), anyString(), anyInt(),
anyLong(), anyInt(), any())).thenReturn(result2);
@@ -188,7 +188,8 @@ public class TieredMessageStoreTest {
properties.setProperty("tieredStorageLevel", "3");
configuration.update(properties);
when(nextStore.checkInStoreByConsumeOffset(anyString(), anyInt(),
anyLong())).thenReturn(true);
- Assert.assertSame(result2, store.getMessage("group", mq.getTopic(),
mq.getQueueId(), 0, 0, null));
+ Assert.assertEquals(result2.getStatus(),
+ store.getMessage("group", mq.getTopic(), mq.getQueueId(), 0, 0,
null).getStatus());
}
@Test