This is an automated email from the ASF dual-hosted git repository.
lizhimin pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 8341c13d06 [ISSUE #8402] Fix init retry topic offset incorrect when
EscapeBridge enabled (#8404)
8341c13d06 is described below
commit 8341c13d064c96e9ef70da4fcf17e49d3e1847f9
Author: imzs <[email protected]>
AuthorDate: Tue Jul 23 10:24:12 2024 +0800
[ISSUE #8402] Fix init retry topic offset incorrect when EscapeBridge
enabled (#8404)
---
.../broker/processor/PopMessageProcessor.java | 10 ++--
.../broker/processor/PopReviveService.java | 4 +-
.../broker/processor/PopMessageProcessorTest.java | 63 +++++++++++++++++++++-
3 files changed, 68 insertions(+), 9 deletions(-)
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
index 0304a5dab0..89b4c39d72 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
@@ -723,8 +723,8 @@ public class PopMessageProcessor implements
NettyRequestProcessor {
private long getInitOffset(String topic, String group, int queueId, int
initMode, boolean init) {
long offset;
- if (ConsumeInitMode.MIN == initMode) {
- return
this.brokerController.getMessageStore().getMinOffsetInQueue(topic, queueId);
+ if (ConsumeInitMode.MIN == initMode ||
topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
+ offset =
this.brokerController.getMessageStore().getMinOffsetInQueue(topic, queueId);
} else {
if
(this.brokerController.getBrokerConfig().isInitPopOffsetByCheckMsgInMem() &&
this.brokerController.getMessageStore().getMinOffsetInQueue(topic, queueId) <=
0 &&
@@ -738,10 +738,10 @@ public class PopMessageProcessor implements
NettyRequestProcessor {
offset = 0;
}
}
- if (init) {
- this.brokerController.getConsumerOffsetManager().commitOffset(
+ }
+ if (init) { // whichever initMode
+ this.brokerController.getConsumerOffsetManager().commitOffset(
"getPopOffset", group, topic, queueId, offset);
- }
}
return offset;
}
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java
index e3ba492f28..8074af23bf 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java
@@ -125,7 +125,7 @@ public class PopReviveService extends ServiceThread {
msgInner.getProperties().put(MessageConst.PROPERTY_FIRST_POP_TIME,
String.valueOf(popCheckPoint.getPopTime()));
}
msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));
- addRetryTopicIfNoExit(msgInner.getTopic(), popCheckPoint.getCId());
+ addRetryTopicIfNotExist(msgInner.getTopic(), popCheckPoint.getCId());
PutMessageResult putMessageResult =
brokerController.getEscapeBridge().putMessageToSpecificQueue(msgInner);
PopMetricsManager.incPopReviveRetryMessageCount(popCheckPoint,
putMessageResult.getPutMessageStatus());
if (brokerController.getBrokerConfig().isEnablePopLog()) {
@@ -153,7 +153,7 @@ public class PopReviveService extends ServiceThread {
}
}
- private void addRetryTopicIfNoExit(String topic, String consumerGroup) {
+ public void addRetryTopicIfNotExist(String topic, String consumerGroup) {
if (brokerController != null) {
TopicConfig topicConfig =
brokerController.getTopicConfigManager().selectTopicConfig(topic);
if (topicConfig != null) {
diff --git
a/broker/src/test/java/org/apache/rocketmq/broker/processor/PopMessageProcessorTest.java
b/broker/src/test/java/org/apache/rocketmq/broker/processor/PopMessageProcessorTest.java
index d8c8fa1034..8a2ce8a2ba 100644
---
a/broker/src/test/java/org/apache/rocketmq/broker/processor/PopMessageProcessorTest.java
+++
b/broker/src/test/java/org/apache/rocketmq/broker/processor/PopMessageProcessorTest.java
@@ -23,6 +23,7 @@ import java.util.concurrent.CompletableFuture;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.client.ClientChannelInfo;
import org.apache.rocketmq.common.BrokerConfig;
+import org.apache.rocketmq.common.KeyBuilder;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.constant.ConsumeInitMode;
import org.apache.rocketmq.common.message.MessageDecoder;
@@ -40,6 +41,7 @@ import org.apache.rocketmq.store.GetMessageStatus;
import org.apache.rocketmq.store.SelectMappedBufferResult;
import org.apache.rocketmq.store.config.MessageStoreConfig;
import org.apache.rocketmq.store.logfile.DefaultMappedFile;
+import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -53,6 +55,7 @@ import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.when;
@RunWith(MockitoJUnitRunner.class)
@@ -151,14 +154,70 @@ public class PopMessageProcessorTest {
assertThat(response.getRemark()).contains("pop message is forbidden
because timerWheelEnable is false");
}
+ @Test
+ public void testGetInitOffset_retryTopic() throws RemotingCommandException
{
+ when(messageStore.getMessageStoreConfig()).thenReturn(new
MessageStoreConfig());
+ String newGroup = group + "-" + System.currentTimeMillis();
+ String retryTopic = KeyBuilder.buildPopRetryTopic(topic, newGroup);
+ long minOffset = 100L;
+ when(messageStore.getMinOffsetInQueue(retryTopic,
0)).thenReturn(minOffset);
+
brokerController.getTopicConfigManager().getTopicConfigTable().put(retryTopic,
new TopicConfig(retryTopic, 1, 1));
+ GetMessageResult getMessageResult = createGetMessageResult(0);
+ when(messageStore.getMessageAsync(eq(newGroup), anyString(), anyInt(),
anyLong(), anyInt(), any()))
+
.thenReturn(CompletableFuture.completedFuture(getMessageResult));
+
+ long offset =
brokerController.getConsumerOffsetManager().queryOffset(newGroup, retryTopic,
0);
+ Assert.assertEquals(-1, offset);
+
+ RemotingCommand request = createPopMsgCommand(newGroup, topic, 0,
ConsumeInitMode.MAX);
+ popMessageProcessor.processRequest(handlerContext, request);
+ offset =
brokerController.getConsumerOffsetManager().queryOffset(newGroup, retryTopic,
0);
+ Assert.assertEquals(minOffset, offset);
+
+ when(messageStore.getMinOffsetInQueue(retryTopic,
0)).thenReturn(minOffset * 2);
+ popMessageProcessor.processRequest(handlerContext, request);
+ offset =
brokerController.getConsumerOffsetManager().queryOffset(newGroup, retryTopic,
0);
+ Assert.assertEquals(minOffset, offset); // will not entry
getInitOffset() again
+ messageStore.getMinOffsetInQueue(retryTopic, 0); // prevent
UnnecessaryStubbingException
+ }
+
+ @Test
+ public void testGetInitOffset_normalTopic() throws
RemotingCommandException {
+ long maxOffset = 999L;
+ when(messageStore.getMessageStoreConfig()).thenReturn(new
MessageStoreConfig());
+ when(messageStore.getMaxOffsetInQueue(topic, 0)).thenReturn(maxOffset);
+ String newGroup = group + "-" + System.currentTimeMillis();
+ GetMessageResult getMessageResult = createGetMessageResult(0);
+ when(messageStore.getMessageAsync(eq(newGroup), anyString(), anyInt(),
anyLong(), anyInt(), any()))
+
.thenReturn(CompletableFuture.completedFuture(getMessageResult));
+
+ long offset =
brokerController.getConsumerOffsetManager().queryOffset(newGroup, topic, 0);
+ Assert.assertEquals(-1, offset);
+
+ RemotingCommand request = createPopMsgCommand(newGroup, topic, 0,
ConsumeInitMode.MAX);
+ popMessageProcessor.processRequest(handlerContext, request);
+ offset =
brokerController.getConsumerOffsetManager().queryOffset(newGroup, topic, 0);
+ Assert.assertEquals(maxOffset - 1, offset); // checkInMem return false
+
+ when(messageStore.getMaxOffsetInQueue(topic, 0)).thenReturn(maxOffset
* 2);
+ popMessageProcessor.processRequest(handlerContext, request);
+ offset =
brokerController.getConsumerOffsetManager().queryOffset(newGroup, topic, 0);
+ Assert.assertEquals(maxOffset - 1, offset); // will not entry
getInitOffset() again
+ messageStore.getMaxOffsetInQueue(topic, 0); // prevent
UnnecessaryStubbingException
+ }
+
private RemotingCommand createPopMsgCommand() {
+ return createPopMsgCommand(group, topic, -1, ConsumeInitMode.MAX);
+ }
+
+ private RemotingCommand createPopMsgCommand(String group, String topic,
int queueId, int initMode) {
PopMessageRequestHeader requestHeader = new PopMessageRequestHeader();
requestHeader.setConsumerGroup(group);
requestHeader.setMaxMsgNums(30);
- requestHeader.setQueueId(-1);
+ requestHeader.setQueueId(queueId);
requestHeader.setTopic(topic);
requestHeader.setInvisibleTime(10_000);
- requestHeader.setInitMode(ConsumeInitMode.MAX);
+ requestHeader.setInitMode(initMode);
requestHeader.setOrder(false);
requestHeader.setPollTime(15_000);
requestHeader.setBornTime(System.currentTimeMillis());