This is an automated email from the ASF dual-hosted git repository.
caigy pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new af43a3e71f Fix exception when pop messages with multiple LMQ indexes
(#7863)
af43a3e71f is described below
commit af43a3e71f2bdb4765294f7d6314b1428737849d
Author: Liu Shengzhong <[email protected]>
AuthorDate: Tue Apr 30 12:46:47 2024 +0800
Fix exception when pop messages with multiple LMQ indexes (#7863)
---
.../rocketmq/client/impl/MQClientAPIImpl.java | 35 ++++++----
.../rocketmq/client/impl/MQClientAPIImplTest.java | 81 ++++++++++++++++++++++
2 files changed, 101 insertions(+), 15 deletions(-)
diff --git
a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
index 12d305b612..0c58affa34 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
@@ -30,6 +30,7 @@ import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.commons.lang3.ArrayUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.ClientConfig;
import org.apache.rocketmq.client.Validators;
@@ -1155,15 +1156,18 @@ public class MQClientAPIImpl implements
NameServerUpdateCallback {
final Long msgQueueOffset;
if (MixAll.isLmq(topic) &&
messageExt.getReconsumeTimes() == 0 && StringUtils.isNotEmpty(
messageExt.getProperty(MessageConst.PROPERTY_INNER_MULTI_DISPATCH))) {
- // process LMQ, LMQ topic has only 1 queue, which
queue id is 0
+ // process LMQ
+ String[] queues =
messageExt.getProperty(MessageConst.PROPERTY_INNER_MULTI_DISPATCH)
+ .split(MixAll.MULTI_DISPATCH_QUEUE_SPLITTER);
+ String[] queueOffsets =
messageExt.getProperty(MessageConst.PROPERTY_INNER_MULTI_QUEUE_OFFSET)
+ .split(MixAll.MULTI_DISPATCH_QUEUE_SPLITTER);
+ long offset =
Long.parseLong(queueOffsets[ArrayUtils.indexOf(queues, topic)]);
+ // LMQ topic has only 1 queue, which queue id is 0
queueIdKey =
ExtraInfoUtil.getStartOffsetInfoMapKey(topic, MixAll.LMQ_QUEUE_ID);
- queueOffsetKey =
ExtraInfoUtil.getQueueOffsetMapKey(topic, MixAll.LMQ_QUEUE_ID, Long.parseLong(
-
messageExt.getProperty(MessageConst.PROPERTY_INNER_MULTI_QUEUE_OFFSET)));
- index = sortMap.get(queueIdKey).indexOf(
-
Long.parseLong(messageExt.getProperty(MessageConst.PROPERTY_INNER_MULTI_QUEUE_OFFSET)));
+ queueOffsetKey =
ExtraInfoUtil.getQueueOffsetMapKey(topic, MixAll.LMQ_QUEUE_ID, offset);
+ index = sortMap.get(queueIdKey).indexOf(offset);
msgQueueOffset =
msgOffsetInfo.get(queueIdKey).get(index);
- if (msgQueueOffset != Long.parseLong(
-
messageExt.getProperty(MessageConst.PROPERTY_INNER_MULTI_QUEUE_OFFSET))) {
+ if (msgQueueOffset != offset) {
log.warn("Queue offset[%d] of msg is strange,
not equal to the stored in msg, %s",
msgQueueOffset, messageExt);
}
@@ -1217,14 +1221,15 @@ public class MQClientAPIImpl implements
NameServerUpdateCallback {
final String key;
if (MixAll.isLmq(topic) && messageExt.getReconsumeTimes() == 0
&&
StringUtils.isNotEmpty(messageExt.getProperty(MessageConst.PROPERTY_INNER_MULTI_DISPATCH)))
{
- // process LMQ, LMQ topic has only 1 queue, which queue id is 0
- key = ExtraInfoUtil.getStartOffsetInfoMapKey(
-
messageExt.getProperty(MessageConst.PROPERTY_INNER_MULTI_DISPATCH), 0);
- if (!sortMap.containsKey(key)) {
- sortMap.put(key, new ArrayList<>(4));
- }
- sortMap.get(key).add(
-
Long.parseLong(messageExt.getProperty(MessageConst.PROPERTY_INNER_MULTI_QUEUE_OFFSET)));
+ // process LMQ
+ String[] queues =
messageExt.getProperty(MessageConst.PROPERTY_INNER_MULTI_DISPATCH)
+ .split(MixAll.MULTI_DISPATCH_QUEUE_SPLITTER);
+ String[] queueOffsets =
messageExt.getProperty(MessageConst.PROPERTY_INNER_MULTI_QUEUE_OFFSET)
+ .split(MixAll.MULTI_DISPATCH_QUEUE_SPLITTER);
+ // LMQ topic has only 1 queue, which queue id is 0
+ key = ExtraInfoUtil.getStartOffsetInfoMapKey(topic,
MixAll.LMQ_QUEUE_ID);
+ sortMap.putIfAbsent(key, new ArrayList<>(4));
+
sortMap.get(key).add(Long.parseLong(queueOffsets[ArrayUtils.indexOf(queues,
topic)]));
continue;
}
// Value of POP_CK is used to determine whether it is a pop retry,
diff --git
a/client/src/test/java/org/apache/rocketmq/client/impl/MQClientAPIImplTest.java
b/client/src/test/java/org/apache/rocketmq/client/impl/MQClientAPIImplTest.java
index 08e7fbe09a..dc892a3548 100644
---
a/client/src/test/java/org/apache/rocketmq/client/impl/MQClientAPIImplTest.java
+++
b/client/src/test/java/org/apache/rocketmq/client/impl/MQClientAPIImplTest.java
@@ -41,6 +41,7 @@ import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.PlainAccessConfig;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.common.message.MessageAccessor;
import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.common.message.MessageDecoder;
import org.apache.rocketmq.common.message.MessageExt;
@@ -570,6 +571,86 @@ public class MQClientAPIImplTest {
done.await();
}
+ @Test
+ public void testPopMultiLmqMessage_async() throws Exception {
+ final long popTime = System.currentTimeMillis();
+ final int invisibleTime = 10 * 1000;
+ final String lmqTopic = MixAll.LMQ_PREFIX + "lmq1";
+ final String lmqTopic2 = MixAll.LMQ_PREFIX + "lmq2";
+ final String multiDispatch =
String.join(MixAll.MULTI_DISPATCH_QUEUE_SPLITTER, lmqTopic, lmqTopic2);
+ final String multiOffset =
String.join(MixAll.MULTI_DISPATCH_QUEUE_SPLITTER, "0", "0");
+ doAnswer(new Answer<Void>() {
+ @Override
+ public Void answer(InvocationOnMock mock) throws Throwable {
+ InvokeCallback callback = mock.getArgument(3);
+ RemotingCommand request = mock.getArgument(1);
+ ResponseFuture responseFuture = new ResponseFuture(null,
request.getOpaque(), 3 * 1000, null, null);
+ RemotingCommand response =
RemotingCommand.createResponseCommand(PopMessageResponseHeader.class);
+ response.setCode(ResponseCode.SUCCESS);
+ response.setOpaque(request.getOpaque());
+
+ PopMessageResponseHeader responseHeader =
(PopMessageResponseHeader) response.readCustomHeader();
+ responseHeader.setInvisibleTime(invisibleTime);
+ responseHeader.setPopTime(popTime);
+ responseHeader.setReviveQid(0);
+ responseHeader.setRestNum(1);
+ StringBuilder startOffsetInfo = new StringBuilder(64);
+ ExtraInfoUtil.buildStartOffsetInfo(startOffsetInfo, topic, 0,
0L);
+ responseHeader.setStartOffsetInfo(startOffsetInfo.toString());
+ StringBuilder msgOffsetInfo = new StringBuilder(64);
+ ExtraInfoUtil.buildMsgOffsetInfo(msgOffsetInfo, topic, 0,
Collections.singletonList(0L));
+ responseHeader.setMsgOffsetInfo(msgOffsetInfo.toString());
+ response.setRemark("FOUND");
+ response.makeCustomHeaderToNet();
+
+ MessageExt message = new MessageExt();
+ message.setQueueId(0);
+ message.setFlag(0);
+ message.setQueueOffset(10L);
+ message.setCommitLogOffset(10000L);
+ message.setSysFlag(0);
+ message.setBornTimestamp(System.currentTimeMillis());
+ message.setBornHost(new InetSocketAddress("127.0.0.1", 10));
+ message.setStoreTimestamp(System.currentTimeMillis());
+ message.setStoreHost(new InetSocketAddress("127.0.0.1", 11));
+ message.setBody("body".getBytes());
+ message.setTopic(topic);
+ MessageAccessor.putProperty(message,
MessageConst.PROPERTY_INNER_MULTI_DISPATCH, multiDispatch);
+ MessageAccessor.putProperty(message,
MessageConst.PROPERTY_INNER_MULTI_QUEUE_OFFSET, multiOffset);
+ response.setBody(MessageDecoder.encode(message, false));
+ responseFuture.setResponseCommand(response);
+ callback.operationSucceed(responseFuture.getResponseCommand());
+ return null;
+ }
+ }).when(remotingClient).invokeAsync(anyString(),
any(RemotingCommand.class), anyLong(), any(InvokeCallback.class));
+ final CountDownLatch done = new CountDownLatch(1);
+ final PopMessageRequestHeader requestHeader = new
PopMessageRequestHeader();
+ requestHeader.setTopic(lmqTopic);
+ mqClientAPI.popMessageAsync(brokerName, brokerAddr, requestHeader, 10
* 1000, new PopCallback() {
+ @Override
+ public void onSuccess(PopResult popResult) {
+
assertThat(popResult.getPopStatus()).isEqualTo(PopStatus.FOUND);
+ assertThat(popResult.getRestNum()).isEqualTo(1);
+
assertThat(popResult.getInvisibleTime()).isEqualTo(invisibleTime);
+ assertThat(popResult.getPopTime()).isEqualTo(popTime);
+ assertThat(popResult.getMsgFoundList()).size().isEqualTo(1);
+
assertThat(popResult.getMsgFoundList().get(0).getTopic()).isEqualTo(lmqTopic);
+
assertThat(popResult.getMsgFoundList().get(0).getProperty(MessageConst.PROPERTY_INNER_MULTI_DISPATCH))
+ .isEqualTo(multiDispatch);
+
assertThat(popResult.getMsgFoundList().get(0).getProperty(MessageConst.PROPERTY_INNER_MULTI_QUEUE_OFFSET))
+ .isEqualTo(multiOffset);
+ done.countDown();
+ }
+
+ @Override
+ public void onException(Throwable e) {
+ Assertions.fail("want no exception but got one", e);
+ done.countDown();
+ }
+ });
+ done.await();
+ }
+
@Test
public void testAckMessageAsync_Success() throws Exception {
doAnswer(new Answer<Void>() {