lizhimins opened a new issue, #5568: URL: https://github.com/apache/rocketmq/issues/5568
支持长度较长的 group 和 topic 进行 pop 模式消费 问题详细描述: pop 消费模式下的重试消息,会存入 %RETRY%Group_Topic 的 Topic 中,底层存储格式最大支持长度为 127 的 Topic  对于 Topic 的长度上限也是 127,代码见 org.apache.rocketmq.common.topic.TopicValidator#validateTopic  如果 Topic 和 Group 很长,超过 127 的限制之后,将导致 pop 消费非预期工作。为了更好的复现这个问题,我编写了一个 Demo。 现象为客户端无法消费到 重试 Topic 总长度超长的重试消息。同时服务端有一些错误日志。 ```Java String nameServerAddr = "xxx.xxx.xxx.xxx:9876"; String topicName = "topic-" + RandomStringUtils.randomAlphabetic(128).toUpperCase(); String groupName = "group-" + RandomStringUtils.randomAlphabetic(128).toUpperCase(); System.out.printf("Use topicName: %s%n", topicName); System.out.printf("Use groupName: %s%n", groupName); switchPop(groupName, topicName); DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(groupName); consumer.subscribe(topicName, "*"); consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET); consumer.setConsumeThreadMin(1); consumer.setConsumeThreadMax(1); consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> { for (MessageExt message : msgs) { System.out.printf("Listener: %s, messageId: %s, times: %d%n", new Date(System.currentTimeMillis()), message.getMsgId(), message.getReconsumeTimes()); if (message.getReconsumeTimes() == 0) { return ConsumeConcurrentlyStatus.RECONSUME_LATER; } else { return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; }); consumer.setNamesrvAddr(nameServerAddr); consumer.setClientRebalance(false); consumer.start(); System.out.printf("Consumer Started...%n"); DefaultMQProducer producer = new DefaultMQProducer( "PID-1", false, null); producer.setAccessChannel(AccessChannel.CLOUD); producer.setNamesrvAddr(nameServerAddr); producer.start(); System.out.printf("Producer Started...%n"); try { Message msg = new Message( topicName, "*", "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET)); SendResult sendResult = producer.send(msg); System.out.printf("Producer send result: %s, messageId: %s%n", topicName, sendResult.getMsgId()); } catch (Exception e) { e.printStackTrace(); } ```   解决方案: 1. 我们计划对存储层进行一个存储格式的简单修改,并将其称为 message version v2。 通过判断消息的 magic code 来区分是旧版本 v1 (长度 128),还是 v2(长度扩展为 256)来支持长 topic 的存储 2. 由于低版本客户端只支持解析 v1 格式的 topic 数据,代码见 org.apache.rocketmq.common.message.MessageDecoder#decode 为了保持对旧版本客户端和多语言旧客户端对于长 topic 的兼容性,不会将这个新的存储格式透传给客户端。 否则旧版本客户端由于不能解析 magic code 会导致消息静默丢弃。 3. 由于长 Topic 在绝大多数场景中仅被用作重试 topic(尤其是 pop 消费)使用,我们可以在 broker 上直接将 pop retry topic 还原为origin topic(原本这一步是在客户端完成),这也意味着在服务端生成 pop ck 消息而不是在客户端,代码见 ```Java messageExt.getProperties().put(MessageConst.PROPERTY_POP_CK, ExtraInfoUtil.buildExtraInfo(startOffsetInfo.get(queueIdKey), responseHeader.getPopTime(), responseHeader.getInvisibleTime(), responseHeader.getReviveQid(), messageExt.getTopic(), brokerName, messageExt.getQueueId(), msgQueueOffset)); ``` 这个修改将导致 5.0 版本使用 remoting 协议 pop 消费的不兼容。 长期方案: 1. RIP 47 Data Layout V2 中讨论了关于存储格式的长期演进方案,能够彻底的解决上述问题,详细信息请参考邮件列表和文档 https://github.com/apache/rocketmq/wiki/RIP-47-Data-Layout-V2 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
