This is an automated email from the ASF dual-hosted git repository.
pingww pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new b44a1ebc36 [ISSUE apache#6576] Fix pop lmq message in client (#6577)
b44a1ebc36 is described below
commit b44a1ebc36114f67a7c7a5dc03b44b766bdc8d9f
Author: cnScarb <[email protected]>
AuthorDate: Tue Jun 13 16:29:05 2023 +0800
[ISSUE apache#6576] Fix pop lmq message in client (#6577)
---
.../rocketmq/client/impl/MQClientAPIImpl.java | 147 ++++++++++++++-------
.../rocketmq/client/impl/MQClientAPIImplTest.java | 77 +++++++++++
.../java/org/apache/rocketmq/common/MixAll.java | 1 +
3 files changed, 177 insertions(+), 48 deletions(-)
diff --git
a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
index 995362bb77..d89d3f93b7 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
@@ -1063,45 +1063,64 @@ public class MQClientAPIImpl implements
NameServerUpdateCallback {
PopResult popResult = new PopResult(popStatus, msgFoundList);
PopMessageResponseHeader responseHeader = (PopMessageResponseHeader)
response.decodeCommandCustomHeader(PopMessageResponseHeader.class);
popResult.setRestNum(responseHeader.getRestNum());
+ if (popStatus != PopStatus.FOUND) {
+ return popResult;
+ }
// it is a pop command if pop time greater than 0, we should set the
check point info to extraInfo field
- if (popStatus == PopStatus.FOUND) {
- Map<String, Long> startOffsetInfo = null;
- Map<String, List<Long>> msgOffsetInfo = null;
- Map<String, Integer> orderCountInfo = null;
+ Map<String, Long> startOffsetInfo = null;
+ Map<String, List<Long>> msgOffsetInfo = null;
+ Map<String, Integer> orderCountInfo = null;
+ if (requestHeader instanceof PopMessageRequestHeader) {
+ popResult.setInvisibleTime(responseHeader.getInvisibleTime());
+ popResult.setPopTime(responseHeader.getPopTime());
+ startOffsetInfo =
ExtraInfoUtil.parseStartOffsetInfo(responseHeader.getStartOffsetInfo());
+ msgOffsetInfo =
ExtraInfoUtil.parseMsgOffsetInfo(responseHeader.getMsgOffsetInfo());
+ orderCountInfo =
ExtraInfoUtil.parseOrderCountInfo(responseHeader.getOrderCountInfo());
+ }
+ Map<String/*topicMark@queueId*/, List<Long>/*msg queueOffset*/> sortMap
+ = buildQueueOffsetSortedMap(topic, msgFoundList);
+ Map<String, String> map = new HashMap<>(5);
+ for (MessageExt messageExt : msgFoundList) {
if (requestHeader instanceof PopMessageRequestHeader) {
- popResult.setInvisibleTime(responseHeader.getInvisibleTime());
- popResult.setPopTime(responseHeader.getPopTime());
- startOffsetInfo =
ExtraInfoUtil.parseStartOffsetInfo(responseHeader.getStartOffsetInfo());
- msgOffsetInfo =
ExtraInfoUtil.parseMsgOffsetInfo(responseHeader.getMsgOffsetInfo());
- orderCountInfo =
ExtraInfoUtil.parseOrderCountInfo(responseHeader.getOrderCountInfo());
- }
- Map<String/*topicMark@queueId*/, List<Long>/*msg queueOffset*/>
sortMap = new HashMap<>(16);
- for (MessageExt messageExt : msgFoundList) {
- String key =
ExtraInfoUtil.getStartOffsetInfoMapKey(messageExt.getTopic(),
messageExt.getQueueId());
- if (!sortMap.containsKey(key)) {
- sortMap.put(key, new ArrayList<>(4));
- }
- sortMap.get(key).add(messageExt.getQueueOffset());
- }
- Map<String, String> map = new HashMap<>(5);
- for (MessageExt messageExt : msgFoundList) {
- if (requestHeader instanceof PopMessageRequestHeader) {
- if (startOffsetInfo == null) {
- // we should set the check point info to extraInfo
field , if the command is popMsg
- // find pop ck offset
- String key = messageExt.getTopic() +
messageExt.getQueueId();
- if (!map.containsKey(messageExt.getTopic() +
messageExt.getQueueId())) {
- map.put(key,
ExtraInfoUtil.buildExtraInfo(messageExt.getQueueOffset(),
responseHeader.getPopTime(), responseHeader.getInvisibleTime(),
responseHeader.getReviveQid(),
- messageExt.getTopic(), brokerName,
messageExt.getQueueId()));
+ if (startOffsetInfo == null) {
+ // we should set the check point info to extraInfo field ,
if the command is popMsg
+ // find pop ck offset
+ String key = messageExt.getTopic() +
messageExt.getQueueId();
+ if (!map.containsKey(messageExt.getTopic() +
messageExt.getQueueId())) {
+ map.put(key,
ExtraInfoUtil.buildExtraInfo(messageExt.getQueueOffset(),
responseHeader.getPopTime(), responseHeader.getInvisibleTime(),
responseHeader.getReviveQid(),
+ messageExt.getTopic(), brokerName,
messageExt.getQueueId()));
- }
-
messageExt.getProperties().put(MessageConst.PROPERTY_POP_CK, map.get(key) +
MessageConst.KEY_SEPARATOR + messageExt.getQueueOffset());
- } else {
- if
(messageExt.getProperty(MessageConst.PROPERTY_POP_CK) == null) {
- String queueIdKey =
ExtraInfoUtil.getStartOffsetInfoMapKey(messageExt.getTopic(),
messageExt.getQueueId());
- String queueOffsetKey =
ExtraInfoUtil.getQueueOffsetMapKey(messageExt.getTopic(),
messageExt.getQueueId(), messageExt.getQueueOffset());
- int index =
sortMap.get(queueIdKey).indexOf(messageExt.getQueueOffset());
- Long msgQueueOffset =
msgOffsetInfo.get(queueIdKey).get(index);
+ }
+
messageExt.getProperties().put(MessageConst.PROPERTY_POP_CK, map.get(key) +
MessageConst.KEY_SEPARATOR + messageExt.getQueueOffset());
+ } else {
+ if (messageExt.getProperty(MessageConst.PROPERTY_POP_CK)
== null) {
+ final String queueIdKey;
+ final String queueOffsetKey;
+ final int index;
+ final Long msgQueueOffset;
+ if (MixAll.isLmq(topic) &&
messageExt.getReconsumeTimes() == 0 && StringUtils.isNotEmpty(
+
messageExt.getProperty(MessageConst.PROPERTY_INNER_MULTI_DISPATCH))) {
+ // process LMQ, LMQ topic has only 1 queue, which
queue id is 0
+ queueIdKey =
ExtraInfoUtil.getStartOffsetInfoMapKey(topic, MixAll.LMQ_QUEUE_ID);
+ queueOffsetKey =
ExtraInfoUtil.getQueueOffsetMapKey(topic, MixAll.LMQ_QUEUE_ID, Long.parseLong(
+
messageExt.getProperty(MessageConst.PROPERTY_INNER_MULTI_QUEUE_OFFSET)));
+ index = sortMap.get(queueIdKey).indexOf(
+
Long.parseLong(messageExt.getProperty(MessageConst.PROPERTY_INNER_MULTI_QUEUE_OFFSET)));
+ msgQueueOffset =
msgOffsetInfo.get(queueIdKey).get(index);
+ if (msgQueueOffset != Long.parseLong(
+
messageExt.getProperty(MessageConst.PROPERTY_INNER_MULTI_QUEUE_OFFSET))) {
+ log.warn("Queue offset[%d] of msg is strange,
not equal to the stored in msg, %s",
+ msgQueueOffset, messageExt);
+ }
+
messageExt.getProperties().put(MessageConst.PROPERTY_POP_CK,
+
ExtraInfoUtil.buildExtraInfo(startOffsetInfo.get(queueIdKey),
responseHeader.getPopTime(), responseHeader.getInvisibleTime(),
+ responseHeader.getReviveQid(), topic,
brokerName, 0, msgQueueOffset)
+ );
+ } else {
+ queueIdKey =
ExtraInfoUtil.getStartOffsetInfoMapKey(messageExt.getTopic(),
messageExt.getQueueId());
+ queueOffsetKey =
ExtraInfoUtil.getQueueOffsetMapKey(messageExt.getTopic(),
messageExt.getQueueId(), messageExt.getQueueOffset());
+ index =
sortMap.get(queueIdKey).indexOf(messageExt.getQueueOffset());
+ msgQueueOffset =
msgOffsetInfo.get(queueIdKey).get(index);
if (msgQueueOffset != messageExt.getQueueOffset())
{
log.warn("Queue offset[%d] of msg is strange,
not equal to the stored in msg, %s", msgQueueOffset, messageExt);
}
@@ -1109,27 +1128,59 @@ public class MQClientAPIImpl implements
NameServerUpdateCallback {
ExtraInfoUtil.buildExtraInfo(startOffsetInfo.get(queueIdKey),
responseHeader.getPopTime(), responseHeader.getInvisibleTime(),
responseHeader.getReviveQid(),
messageExt.getTopic(), brokerName, messageExt.getQueueId(), msgQueueOffset)
);
- if (((PopMessageRequestHeader)
requestHeader).isOrder() && orderCountInfo != null) {
- Integer count =
orderCountInfo.get(queueOffsetKey);
- if (count == null) {
- count = orderCountInfo.get(queueIdKey);
- }
- if (count != null && count > 0) {
- messageExt.setReconsumeTimes(count);
- }
+ }
+ if (((PopMessageRequestHeader)
requestHeader).isOrder() && orderCountInfo != null) {
+ Integer count = orderCountInfo.get(queueOffsetKey);
+ if (count == null) {
+ count = orderCountInfo.get(queueIdKey);
+ }
+ if (count != null && count > 0) {
+ messageExt.setReconsumeTimes(count);
}
}
}
- messageExt.getProperties().computeIfAbsent(
- MessageConst.PROPERTY_FIRST_POP_TIME, k ->
String.valueOf(responseHeader.getPopTime()));
}
- messageExt.setBrokerName(brokerName);
- messageExt.setTopic(NamespaceUtil.withoutNamespace(topic,
this.clientConfig.getNamespace()));
+ messageExt.getProperties().computeIfAbsent(
+ MessageConst.PROPERTY_FIRST_POP_TIME, k ->
String.valueOf(responseHeader.getPopTime()));
}
+ messageExt.setBrokerName(brokerName);
+ messageExt.setTopic(NamespaceUtil.withoutNamespace(topic,
this.clientConfig.getNamespace()));
}
return popResult;
}
+ /**
+ * Build queue offset sorted map
+ *
+ * @param topic pop consumer topic
+ * @param msgFoundList popped message list
+ * @return sorted map, key is topicMark@queueId, value is sorted msg
queueOffset list
+ */
+ private static Map<String, List<Long>> buildQueueOffsetSortedMap(String
topic, List<MessageExt> msgFoundList) {
+ Map<String/*topicMark@queueId*/, List<Long>/*msg queueOffset*/>
sortMap = new HashMap<>(16);
+ for (MessageExt messageExt : msgFoundList) {
+ final String key;
+ if (MixAll.isLmq(topic) && messageExt.getReconsumeTimes() == 0
+ &&
StringUtils.isNotEmpty(messageExt.getProperty(MessageConst.PROPERTY_INNER_MULTI_DISPATCH)))
{
+ // process LMQ, LMQ topic has only 1 queue, which queue id is 0
+ key = ExtraInfoUtil.getStartOffsetInfoMapKey(
+
messageExt.getProperty(MessageConst.PROPERTY_INNER_MULTI_DISPATCH), 0);
+ if (!sortMap.containsKey(key)) {
+ sortMap.put(key, new ArrayList<>(4));
+ }
+ sortMap.get(key).add(
+
Long.parseLong(messageExt.getProperty(MessageConst.PROPERTY_INNER_MULTI_QUEUE_OFFSET)));
+ continue;
+ }
+ key =
ExtraInfoUtil.getStartOffsetInfoMapKey(messageExt.getTopic(),
messageExt.getQueueId());
+ if (!sortMap.containsKey(key)) {
+ sortMap.put(key, new ArrayList<>(4));
+ }
+ sortMap.get(key).add(messageExt.getQueueOffset());
+ }
+ return sortMap;
+ }
+
public MessageExt viewMessage(final String addr, final long phyoffset,
final long timeoutMillis)
throws RemotingException, MQBrokerException, InterruptedException {
ViewMessageRequestHeader requestHeader = new
ViewMessageRequestHeader();
diff --git
a/client/src/test/java/org/apache/rocketmq/client/impl/MQClientAPIImplTest.java
b/client/src/test/java/org/apache/rocketmq/client/impl/MQClientAPIImplTest.java
index 890301a48b..d13f2cfe43 100644
---
a/client/src/test/java/org/apache/rocketmq/client/impl/MQClientAPIImplTest.java
+++
b/client/src/test/java/org/apache/rocketmq/client/impl/MQClientAPIImplTest.java
@@ -38,6 +38,7 @@ import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.common.AclConfig;
+import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.PlainAccessConfig;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.message.Message;
@@ -496,6 +497,82 @@ public class MQClientAPIImplTest {
done.await();
}
+ @Test
+ public void testPopLmqMessage_async() throws Exception {
+ final long popTime = System.currentTimeMillis();
+ final int invisibleTime = 10 * 1000;
+ final String lmqTopic = MixAll.LMQ_PREFIX + "lmq1";
+ doAnswer(new Answer<Void>() {
+ @Override
+ public Void answer(InvocationOnMock mock) throws Throwable {
+ InvokeCallback callback = mock.getArgument(3);
+ RemotingCommand request = mock.getArgument(1);
+ ResponseFuture responseFuture = new ResponseFuture(null,
request.getOpaque(), 3 * 1000, null, null);
+ RemotingCommand response =
RemotingCommand.createResponseCommand(PopMessageResponseHeader.class);
+ response.setCode(ResponseCode.SUCCESS);
+ response.setOpaque(request.getOpaque());
+
+ PopMessageResponseHeader responseHeader =
(PopMessageResponseHeader) response.readCustomHeader();
+ responseHeader.setInvisibleTime(invisibleTime);
+ responseHeader.setPopTime(popTime);
+ responseHeader.setReviveQid(0);
+ responseHeader.setRestNum(1);
+ StringBuilder startOffsetInfo = new StringBuilder(64);
+ ExtraInfoUtil.buildStartOffsetInfo(startOffsetInfo, false, 0,
0L);
+ responseHeader.setStartOffsetInfo(startOffsetInfo.toString());
+ StringBuilder msgOffsetInfo = new StringBuilder(64);
+ ExtraInfoUtil.buildMsgOffsetInfo(msgOffsetInfo, false, 0,
Collections.singletonList(0L));
+ responseHeader.setMsgOffsetInfo(msgOffsetInfo.toString());
+ response.setRemark("FOUND");
+ response.makeCustomHeaderToNet();
+
+ MessageExt message = new MessageExt();
+ message.setQueueId(3);
+ message.setFlag(0);
+ message.setQueueOffset(5L);
+ message.setCommitLogOffset(11111L);
+ message.setSysFlag(0);
+ message.setBornTimestamp(System.currentTimeMillis());
+ message.setBornHost(new InetSocketAddress("127.0.0.1", 10));
+ message.setStoreTimestamp(System.currentTimeMillis());
+ message.setStoreHost(new InetSocketAddress("127.0.0.1", 11));
+ message.setBody("body".getBytes());
+ message.setTopic(topic);
+ message.putUserProperty("key", "value");
+
message.putUserProperty(MessageConst.PROPERTY_INNER_MULTI_DISPATCH, lmqTopic);
+
message.getProperties().put(MessageConst.PROPERTY_INNER_MULTI_QUEUE_OFFSET,
String.valueOf(0));
+ response.setBody(MessageDecoder.encode(message, false));
+ responseFuture.setResponseCommand(response);
+ callback.operationComplete(responseFuture);
+ return null;
+ }
+ }).when(remotingClient).invokeAsync(anyString(),
any(RemotingCommand.class), anyLong(), any(InvokeCallback.class));
+ final CountDownLatch done = new CountDownLatch(1);
+ final PopMessageRequestHeader requestHeader = new
PopMessageRequestHeader();
+ requestHeader.setTopic(lmqTopic);
+ mqClientAPI.popMessageAsync(brokerName, brokerAddr, requestHeader, 10
* 1000, new PopCallback() {
+ @Override
+ public void onSuccess(PopResult popResult) {
+
assertThat(popResult.getPopStatus()).isEqualTo(PopStatus.FOUND);
+ assertThat(popResult.getRestNum()).isEqualTo(1);
+
assertThat(popResult.getInvisibleTime()).isEqualTo(invisibleTime);
+ assertThat(popResult.getPopTime()).isEqualTo(popTime);
+ assertThat(popResult.getMsgFoundList()).size().isEqualTo(1);
+
assertThat(popResult.getMsgFoundList().get(0).getTopic()).isEqualTo(lmqTopic);
+
assertThat(popResult.getMsgFoundList().get(0).getProperty(MessageConst.PROPERTY_INNER_MULTI_DISPATCH))
+ .isEqualTo(lmqTopic);
+ done.countDown();
+ }
+
+ @Override
+ public void onException(Throwable e) {
+ Assertions.fail("want no exception but got one", e);
+ done.countDown();
+ }
+ });
+ done.await();
+ }
+
@Test
public void testAckMessageAsync_Success() throws Exception {
doAnswer(new Answer<Void>() {
diff --git a/common/src/main/java/org/apache/rocketmq/common/MixAll.java
b/common/src/main/java/org/apache/rocketmq/common/MixAll.java
index dc1d69fe10..6b3d2d6ae0 100644
--- a/common/src/main/java/org/apache/rocketmq/common/MixAll.java
+++ b/common/src/main/java/org/apache/rocketmq/common/MixAll.java
@@ -97,6 +97,7 @@ public class MixAll {
public static final String ACL_CONF_TOOLS_FILE = "/conf/tools.yml";
public static final String REPLY_MESSAGE_FLAG = "reply";
public static final String LMQ_PREFIX = "%LMQ%";
+ public static final long LMQ_QUEUE_ID = 0;
public static final String MULTI_DISPATCH_QUEUE_SPLITTER = ",";
public static final String REQ_T = "ReqT";
public static final String ROCKETMQ_ZONE_ENV = "ROCKETMQ_ZONE";