This is an automated email from the ASF dual-hosted git repository. jinrongtong pushed a commit to branch release-rabbitmq-broker-opencore-202601 in repository https://gitbox.apache.org/repos/asf/rocketmq.git
commit cd4c813d30cca973662d5290573775ba30e38223 Author: imzs <[email protected]> AuthorDate: Thu Dec 18 10:18:25 2025 +0800 [RIP-80] #9928 Implementation of Priority Message (#9929) * #9928 Implementation of Priority Message * switch to fastjson2 Change-Id: I3620b00b79a77a93a7cf0fdfac857fb495638ca6 * fix bazel CI, upgrade rocketmq-proto to 2.1.1 Change-Id: Ia66cc7b14b89dc319044ced5ba349315e487c849 * Fix bazel CI * fix CI, temporarily disable popKv in OffsetResetForPopIT Change-Id: I0b406cf0ece5067de430c4404aaa1f2dd46edcba --------- Co-authored-by: RongtongJin <[email protected]> --- WORKSPACE | 2 +- .../rocketmq/broker/pop/PopConsumerService.java | 99 +++++-- .../broker/processor/PopMessageProcessor.java | 11 +- .../broker/processor/PopReviveService.java | 51 +++- .../broker/processor/SendMessageProcessor.java | 10 + .../broker/pop/PopConsumerServiceTest.java | 3 + .../org/apache/rocketmq/common/BrokerConfig.java | 31 +- .../common/SubscriptionGroupAttributes.java | 9 + .../common/attribute/TopicMessageType.java | 11 +- .../apache/rocketmq/common/message/Message.java | 13 + .../rocketmq/common/message/MessageConst.java | 2 + .../common/attribute/TopicMessageTypeTest.java | 19 +- pom.xml | 2 +- .../proxy/grpc/v2/common/GrpcConverter.java | 6 + .../grpc/v2/producer/SendMessageActivity.java | 6 + .../proxy/grpc/v2/route/RouteActivity.java | 2 + .../grpc/v2/producer/SendMessageActivityTest.java | 28 ++ .../subscription/SubscriptionGroupConfig.java | 10 + .../test/client/rmq/RMQNormalProducer.java | 1 + .../rocketmq/test/sendresult/ResultWrapper.java | 10 + .../rocketmq/test/util/MQAdminTestUtils.java | 6 +- .../org/apache/rocketmq/test/base/BaseConf.java | 13 +- .../rocketmq/test/base/IntegrationTestBase.java | 1 + .../test/client/consumer/pop/BasePopNormally.java | 8 + .../test/client/consumer/pop/PopPriorityIT.java | 319 +++++++++++++++++++++ .../rocketmq/test/offset/OffsetResetForPopIT.java | 1 + 26 files changed, 621 insertions(+), 53 deletions(-) diff --git a/WORKSPACE b/WORKSPACE index da58ae6763..254f0051d5 100644 --- a/WORKSPACE +++ b/WORKSPACE @@ -71,7 +71,7 @@ maven_install( "org.bouncycastle:bcpkix-jdk15on:1.69", "com.google.code.gson:gson:2.8.9", "com.googlecode.concurrentlinkedhashmap:concurrentlinkedhashmap-lru:1.4.2", - "org.apache.rocketmq:rocketmq-proto:2.0.4", + "org.apache.rocketmq:rocketmq-proto:2.1.1", "com.google.protobuf:protobuf-java:3.20.1", "com.google.protobuf:protobuf-java-util:3.20.1", "com.conversantmedia:disruptor:1.2.10", diff --git a/broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerService.java b/broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerService.java index 57fac798b2..4a88c6b802 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerService.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerService.java @@ -41,9 +41,11 @@ import org.apache.rocketmq.broker.BrokerController; import org.apache.rocketmq.common.BrokerConfig; import org.apache.rocketmq.common.KeyBuilder; import org.apache.rocketmq.common.MixAll; +import org.apache.rocketmq.common.PopAckConstants; import org.apache.rocketmq.common.ServiceThread; import org.apache.rocketmq.common.TopicConfig; import org.apache.rocketmq.common.TopicFilterType; +import org.apache.rocketmq.common.attribute.TopicMessageType; import org.apache.rocketmq.common.constant.ConsumeInitMode; import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.common.constant.PermName; @@ -54,6 +56,7 @@ import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.message.MessageExtBrokerInner; import org.apache.rocketmq.common.utils.ConcurrentHashMapUtils; import org.apache.rocketmq.remoting.protocol.header.ExtraInfoUtil; +import org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig; import org.apache.rocketmq.store.AppendMessageStatus; import org.apache.rocketmq.store.GetMessageResult; import org.apache.rocketmq.store.GetMessageStatus; @@ -323,6 +326,23 @@ public class PopConsumerService extends ServiceThread { }); } + protected CompletableFuture<PopConsumerContext> getMessageFromTopicAsync(CompletableFuture<PopConsumerContext> future, + String clientHost, String groupId, String topicId, long requestCount, int batchSize, MessageFilter filter, + PopConsumerRecord.RetryType retryType) { + TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(topicId); + if (null == topicConfig) { + return future; + } + for (int i = 0; i < topicConfig.getReadQueueNums(); i++) { + long index = (brokerController.getBrokerConfig().isPriorityOrderAsc() ? + topicConfig.getReadQueueNums() - 1 - i : i) + requestCount; + int current = (int) index % topicConfig.getReadQueueNums(); + future = this.getMessageAsync(future, clientHost, groupId, + topicId, current, batchSize, filter, retryType); + } + return future; + } + public CompletableFuture<PopConsumerContext> popAsync(String clientHost, long popTime, long invisibleTime, String groupId, String topicId, int queueId, int batchSize, boolean fifo, String attemptId, int initMode, MessageFilter filter) { @@ -335,6 +355,12 @@ public class PopConsumerService extends ServiceThread { return CompletableFuture.completedFuture(popConsumerContext); } + SubscriptionGroupConfig subscriptionGroupConfig = + this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(groupId); + if (null == subscriptionGroupConfig || !subscriptionGroupConfig.isConsumeEnable()) { + return CompletableFuture.completedFuture(popConsumerContext); + } + log.debug("PopConsumerService popAsync, groupId={}, topicId={}, queueId={}, " + "batchSize={}, invisibleTime={}, fifo={}, attemptId={}, filter={}", groupId, topicId, queueId, batchSize, invisibleTime, fifo, attemptId, filter); @@ -344,7 +370,13 @@ public class PopConsumerService extends ServiceThread { String retryTopicV2 = KeyBuilder.buildPopRetryTopicV2(topicId, groupId); long requestCount = Objects.requireNonNull(ConcurrentHashMapUtils.computeIfAbsent( requestCountTable, requestKey, k -> new AtomicLong(0L))).getAndIncrement(); - boolean preferRetry = requestCount % 5L == 0L; + boolean usePriorityMode = TopicMessageType.PRIORITY.equals(topicConfig.getTopicMessageType()) + && !fifo && requestCount % 100L < subscriptionGroupConfig.getPriorityFactor(); + int probability = usePriorityMode ? + brokerConfig.getPopFromRetryProbabilityForPriority() : brokerConfig.getPopFromRetryProbability(); + probability = Math.max(0, Math.min(100, probability)); // [51, 100] means always + boolean preferRetry = probability > 0 && requestCount % (100 / probability) == 0L; + requestCount = usePriorityMode ? 0 : requestCount; // use requestCount as randomQ CompletableFuture<PopConsumerContext> getMessageFuture = CompletableFuture.completedFuture(popConsumerContext); @@ -352,13 +384,13 @@ public class PopConsumerService extends ServiceThread { try { if (!fifo && preferRetry) { if (brokerConfig.isRetrieveMessageFromPopRetryTopicV1()) { - getMessageFuture = this.getMessageAsync(getMessageFuture, clientHost, groupId, - retryTopicV1, 0, batchSize, filter, PopConsumerRecord.RetryType.RETRY_TOPIC_V1); + getMessageFuture = this.getMessageFromTopicAsync(getMessageFuture, clientHost, groupId, + retryTopicV1, requestCount, batchSize, filter, PopConsumerRecord.RetryType.RETRY_TOPIC_V1); } if (brokerConfig.isEnableRetryTopicV2()) { - getMessageFuture = this.getMessageAsync(getMessageFuture, clientHost, groupId, - retryTopicV2, 0, batchSize, filter, PopConsumerRecord.RetryType.RETRY_TOPIC_V2); + getMessageFuture = this.getMessageFromTopicAsync(getMessageFuture, clientHost, groupId, + retryTopicV2, requestCount, batchSize, filter, PopConsumerRecord.RetryType.RETRY_TOPIC_V2); } } @@ -366,21 +398,18 @@ public class PopConsumerService extends ServiceThread { getMessageFuture = this.getMessageAsync(getMessageFuture, clientHost, groupId, topicId, queueId, batchSize, filter, PopConsumerRecord.RetryType.NORMAL_TOPIC); } else { - for (int i = 0; i < topicConfig.getReadQueueNums(); i++) { - int current = (int) ((requestCount + i) % topicConfig.getReadQueueNums()); - getMessageFuture = this.getMessageAsync(getMessageFuture, clientHost, groupId, - topicId, current, batchSize, filter, PopConsumerRecord.RetryType.NORMAL_TOPIC); - } + getMessageFuture = this.getMessageFromTopicAsync(getMessageFuture, clientHost, groupId, + topicId, requestCount, batchSize, filter, PopConsumerRecord.RetryType.NORMAL_TOPIC); if (!fifo && !preferRetry) { if (brokerConfig.isRetrieveMessageFromPopRetryTopicV1()) { - getMessageFuture = this.getMessageAsync(getMessageFuture, clientHost, groupId, - retryTopicV1, 0, batchSize, filter, PopConsumerRecord.RetryType.RETRY_TOPIC_V1); + getMessageFuture = this.getMessageFromTopicAsync(getMessageFuture, clientHost, groupId, + retryTopicV1, requestCount, batchSize, filter, PopConsumerRecord.RetryType.RETRY_TOPIC_V1); } if (brokerConfig.isEnableRetryTopicV2()) { - getMessageFuture = this.getMessageAsync(getMessageFuture, clientHost, groupId, - retryTopicV2, 0, batchSize, filter, PopConsumerRecord.RetryType.RETRY_TOPIC_V2); + getMessageFuture = this.getMessageFromTopicAsync(getMessageFuture, clientHost, groupId, + retryTopicV2, requestCount, batchSize, filter, PopConsumerRecord.RetryType.RETRY_TOPIC_V2); } } } @@ -567,21 +596,33 @@ public class PopConsumerService extends ServiceThread { return consumerRecords.size(); } - public void createRetryTopicIfNeeded(String groupId, String topicId) { - TopicConfig topicConfig = brokerController.getTopicConfigManager().selectTopicConfig(topicId); - if (topicConfig != null) { + public void createRetryTopicIfNeeded(String groupId, String retryTopic) { + TopicConfig topicConfig = brokerController.getTopicConfigManager().selectTopicConfig(retryTopic); + if (topicConfig != null && !brokerController.getBrokerConfig().isUseSeparateRetryQueue()) { return; } - topicConfig = new TopicConfig(topicId, 1, 1, + int retryQueueNum = PopAckConstants.retryQueueNum; + if (brokerController.getBrokerConfig().isUseSeparateRetryQueue()) { + String normalTopic = KeyBuilder.parseNormalTopic(retryTopic, groupId); + TopicConfig normalConfig = brokerController.getTopicConfigManager().selectTopicConfig(normalTopic); // always exists + retryQueueNum = normalConfig.getWriteQueueNums(); + if (topicConfig != null && topicConfig.getWriteQueueNums() == normalConfig.getWriteQueueNums()) { + return; + } + } + + topicConfig = new TopicConfig(retryTopic, retryQueueNum, retryQueueNum, PermName.PERM_READ | PermName.PERM_WRITE, 0); topicConfig.setTopicFilterType(TopicFilterType.SINGLE_TAG); brokerController.getTopicConfigManager().updateTopicConfig(topicConfig); - long offset = this.brokerController.getConsumerOffsetManager().queryOffset(groupId, topicId, 0); - if (offset < 0) { - this.brokerController.getConsumerOffsetManager().commitOffset( - "InitPopOffset", groupId, topicId, 0, 0); + for (int i = 0; i < retryQueueNum; i++) { + long offset = this.brokerController.getConsumerOffsetManager().queryOffset(groupId, retryTopic, i); + if (offset < 0) { + this.brokerController.getConsumerOffsetManager().commitOffset( + "InitPopOffset", groupId, retryTopic, i, 0); + } } } @@ -604,7 +645,7 @@ public class PopConsumerService extends ServiceThread { MessageExtBrokerInner msgInner = new MessageExtBrokerInner(); msgInner.setTopic(retryTopic); msgInner.setBody(messageExt.getBody() != null ? messageExt.getBody() : new byte[] {}); - msgInner.setQueueId(0); + msgInner.setQueueId(getRetryQueueId(retryTopic, messageExt)); if (messageExt.getTags() != null) { msgInner.setTags(messageExt.getTags()); } else { @@ -646,6 +687,18 @@ public class PopConsumerService extends ServiceThread { return true; } + private int getRetryQueueId(String retryTopic, MessageExt oriMsg) { + if (!brokerController.getBrokerConfig().isUseSeparateRetryQueue()) { + return 0; + } + int oriQueueId = oriMsg.getQueueId(); // original qid of normal or retry topic + if (oriQueueId > brokerController.getTopicConfigManager().selectTopicConfig(retryTopic).getWriteQueueNums() - 1) { + log.warn("not expected, {}, {}, {}", retryTopic, oriQueueId, oriMsg.getMsgId()); + return 0; // fallback + } + return oriQueueId; + } + // Export kv store record to revive topic @SuppressWarnings("ExtractMethodRecommender") public synchronized void transferToFsStore() { diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java index 6e0d235f00..8592283648 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java @@ -53,6 +53,7 @@ import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.PopAckConstants; import org.apache.rocketmq.common.ServiceThread; import org.apache.rocketmq.common.TopicConfig; +import org.apache.rocketmq.common.attribute.TopicMessageType; import org.apache.rocketmq.common.constant.ConsumeInitMode; import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.common.constant.PermName; @@ -512,11 +513,15 @@ public class PopMessageProcessor implements NettyRequestProcessor { // considered the same type because they share the same retry flag in previous fields. // Therefore, needRetryV1 is designed as a subset of needRetry, and within a single request, // only one type of retry topic is able to call popMsgFromQueue. - boolean needRetry = randomQ < brokerConfig.getPopFromRetryProbability(); + boolean usePriorityMode = TopicMessageType.PRIORITY.equals(topicConfig.getTopicMessageType()) + && !requestHeader.isOrder() && randomQ < subscriptionGroupConfig.getPriorityFactor(); + boolean needRetry = randomQ < (usePriorityMode ? + brokerConfig.getPopFromRetryProbabilityForPriority() : brokerConfig.getPopFromRetryProbability()); boolean needRetryV1 = false; if (brokerConfig.isEnableRetryTopicV2() && brokerConfig.isRetrieveMessageFromPopRetryTopicV1()) { needRetryV1 = randomQ % 2 == 0; } + randomQ = usePriorityMode ? 0 : randomQ; // reset randomQ long popTime = System.currentTimeMillis(); CompletableFuture<Long> getMessageFuture = CompletableFuture.completedFuture(0L); if (needRetry && !requestHeader.isOrder()) { @@ -653,7 +658,9 @@ public class PopMessageProcessor implements NettyRequestProcessor { StringBuilder msgOffsetInfo, StringBuilder orderCountInfo, int randomQ, CompletableFuture<Long> getMessageFuture) { if (topicConfig != null) { for (int i = 0; i < topicConfig.getReadQueueNums(); i++) { - int queueId = (randomQ + i) % topicConfig.getReadQueueNums(); + int index = (brokerController.getBrokerConfig().isPriorityOrderAsc() ? + topicConfig.getReadQueueNums() - 1 - i : i) + randomQ; + int queueId = index % topicConfig.getReadQueueNums(); getMessageFuture = getMessageFuture.thenCompose(restNum -> popMsgFromQueue(topicConfig.getTopicName(), requestHeader.getAttemptId(), isRetry, getMessageResult, requestHeader, queueId, restNum, reviveQid, channel, popTime, messageFilter, diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java index aa7d87505e..87b37cc39b 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java @@ -111,7 +111,6 @@ public class PopReviveService extends ServiceThread { msgInner.setTopic(popCheckPoint.getTopic()); } msgInner.setBody(messageExt.getBody()); - msgInner.setQueueId(0); if (messageExt.getTags() != null) { msgInner.setTags(messageExt.getTags()); } else { @@ -130,6 +129,7 @@ public class PopReviveService extends ServiceThread { msgInner.getProperties().put(MessageConst.PROPERTY_ORIGIN_GROUP, popCheckPoint.getCId()); msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties())); addRetryTopicIfNotExist(msgInner.getTopic(), popCheckPoint.getCId()); + msgInner.setQueueId(getRetryQueueId(msgInner.getTopic(), messageExt)); PutMessageResult putMessageResult = brokerController.getEscapeBridge().putMessageToSpecificQueue(msgInner); brokerController.getBrokerMetricsManager().getPopMetricsManager().incPopReviveRetryMessageCount(popCheckPoint, putMessageResult.getPutMessageStatus()); if (brokerController.getBrokerConfig().isEnablePopLog()) { @@ -149,30 +149,55 @@ public class PopReviveService extends ServiceThread { return true; } - private void initPopRetryOffset(String topic, String consumerGroup) { - long offset = this.brokerController.getConsumerOffsetManager().queryOffset(consumerGroup, topic, 0); - if (offset < 0) { - this.brokerController.getConsumerOffsetManager().commitOffset("initPopRetryOffset", consumerGroup, topic, - 0, 0); + private void initPopRetryOffset(String retryTopic, String consumerGroup, int retryQueueNum) { + for (int i = 0; i < retryQueueNum; i++) { + long offset = this.brokerController.getConsumerOffsetManager().queryOffset(consumerGroup, retryTopic, i); + if (offset < 0) { + this.brokerController.getConsumerOffsetManager().commitOffset("initPopRetryOffset", consumerGroup, retryTopic, i, 0); + } } } - public void addRetryTopicIfNotExist(String topic, String consumerGroup) { + public void addRetryTopicIfNotExist(String retryTopic, String consumerGroup) { if (brokerController != null) { - TopicConfig topicConfig = brokerController.getTopicConfigManager().selectTopicConfig(topic); - if (topicConfig != null) { + TopicConfig topicConfig = brokerController.getTopicConfigManager().selectTopicConfig(retryTopic); + if (topicConfig != null && !brokerController.getBrokerConfig().isUseSeparateRetryQueue()) { return; } - topicConfig = new TopicConfig(topic); - topicConfig.setReadQueueNums(PopAckConstants.retryQueueNum); - topicConfig.setWriteQueueNums(PopAckConstants.retryQueueNum); + + int retryQueueNum = PopAckConstants.retryQueueNum; + if (brokerController.getBrokerConfig().isUseSeparateRetryQueue()) { + String normalTopic = KeyBuilder.parseNormalTopic(retryTopic, consumerGroup); + TopicConfig normalConfig = brokerController.getTopicConfigManager().selectTopicConfig(normalTopic); // always exists + retryQueueNum = normalConfig.getWriteQueueNums(); + if (topicConfig != null && topicConfig.getWriteQueueNums() == normalConfig.getWriteQueueNums()) { + return; + } + } + + // create new one, or update in case of queue expansion + topicConfig = new TopicConfig(retryTopic); + topicConfig.setReadQueueNums(retryQueueNum); + topicConfig.setWriteQueueNums(retryQueueNum); topicConfig.setTopicFilterType(TopicFilterType.SINGLE_TAG); topicConfig.setPerm(6); topicConfig.setTopicSysFlag(0); brokerController.getTopicConfigManager().updateTopicConfig(topicConfig); - initPopRetryOffset(topic, consumerGroup); + initPopRetryOffset(retryTopic, consumerGroup, retryQueueNum); + } + } + + private int getRetryQueueId(String retryTopic, MessageExt messageExt) { + if (!brokerController.getBrokerConfig().isUseSeparateRetryQueue()) { + return 0; + } + int oriQueueId = messageExt.getQueueId(); // original qid of normal or retry topic + if (oriQueueId > brokerController.getTopicConfigManager().selectTopicConfig(retryTopic).getWriteQueueNums() - 1) { + POP_LOGGER.warn("not expected, {}, {}, {}", retryTopic, oriQueueId, messageExt.getMsgId()); + return 0; // fallback } + return oriQueueId; } protected List<MessageExt> getReviveMessage(long offset, int queueId) { diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java index eefdb85ccf..c8e7e4c128 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java @@ -281,6 +281,16 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement } MessageAccessor.setProperties(msgInner, oriProps); + // check properties to ensure exclusive, don't check topic meta config to keep the behavior consistent + int msgPriority = msgInner.getPriority(); + if (msgPriority >= 0) { + if (TopicMessageType.PRIORITY.equals(TopicMessageType.parseFromMessageProperty(msgInner.getProperties()))) { + queueIdInt = Math.min(msgPriority, topicConfig.getWriteQueueNums() - 1); + msgInner.setQueueId(queueIdInt); + } else { + MessageAccessor.clearProperty(msgInner, MessageConst.PROPERTY_PRIORITY); + } + } CleanupPolicy cleanupPolicy = CleanupPolicyUtils.getDeletePolicy(Optional.of(topicConfig)); if (Objects.equals(cleanupPolicy, CleanupPolicy.COMPACTION)) { diff --git a/broker/src/test/java/org/apache/rocketmq/broker/pop/PopConsumerServiceTest.java b/broker/src/test/java/org/apache/rocketmq/broker/pop/PopConsumerServiceTest.java index 9c23a8625e..8fb1b7a8c4 100644 --- a/broker/src/test/java/org/apache/rocketmq/broker/pop/PopConsumerServiceTest.java +++ b/broker/src/test/java/org/apache/rocketmq/broker/pop/PopConsumerServiceTest.java @@ -37,6 +37,7 @@ import org.apache.rocketmq.broker.longpolling.PopLongPollingService; import org.apache.rocketmq.broker.offset.ConsumerOffsetManager; import org.apache.rocketmq.broker.offset.ConsumerOrderInfoManager; import org.apache.rocketmq.broker.processor.PopMessageProcessor; +import org.apache.rocketmq.broker.subscription.SubscriptionGroupManager; import org.apache.rocketmq.broker.topic.TopicConfigManager; import org.apache.rocketmq.common.BrokerConfig; import org.apache.rocketmq.common.KeyBuilder; @@ -93,6 +94,7 @@ public class PopConsumerServiceTest { messageStoreConfig.setStorePathRootDir(filePath); TopicConfigManager topicConfigManager = Mockito.mock(TopicConfigManager.class); + SubscriptionGroupManager subscriptionGroupManager = Mockito.mock(SubscriptionGroupManager.class); ConsumerOffsetManager consumerOffsetManager = Mockito.mock(ConsumerOffsetManager.class); PopMessageProcessor popMessageProcessor = Mockito.mock(PopMessageProcessor.class); PopLongPollingService popLongPollingService = Mockito.mock(PopLongPollingService.class); @@ -101,6 +103,7 @@ public class PopConsumerServiceTest { brokerController = Mockito.mock(BrokerController.class); Mockito.when(brokerController.getBrokerConfig()).thenReturn(brokerConfig); Mockito.when(brokerController.getTopicConfigManager()).thenReturn(topicConfigManager); + Mockito.when(brokerController.getSubscriptionGroupManager()).thenReturn(subscriptionGroupManager); Mockito.when(brokerController.getMessageStoreConfig()).thenReturn(messageStoreConfig); Mockito.when(brokerController.getConsumerOffsetManager()).thenReturn(consumerOffsetManager); Mockito.when(brokerController.getPopMessageProcessor()).thenReturn(popMessageProcessor); diff --git a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java index a46435543a..5142ed12be 100644 --- a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java +++ b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java @@ -240,13 +240,18 @@ public class BrokerConfig extends BrokerIdentity { private boolean retrieveMessageFromPopRetryTopicV1 = true; private boolean enableRetryTopicV2 = false; private int popFromRetryProbability = 20; + // pop retry probability for priority mode + private int popFromRetryProbabilityForPriority = 0; + // 0 as the lowest priority if true + private boolean priorityOrderAsc = true; private boolean popConsumerFSServiceInit = true; private boolean popConsumerKVServiceLog = false; private boolean popConsumerKVServiceInit = false; private boolean popConsumerKVServiceEnable = false; private int popReviveMaxReturnSizePerRead = 16 * 1024; private int popReviveMaxAttemptTimes = 16; - + // each message queue will have a corresponding retry queue + private boolean useSeparateRetryQueue = false; private boolean realTimeNotifyConsumerChange = true; private boolean litePullMessageEnable = true; @@ -2177,4 +2182,28 @@ public class BrokerConfig extends BrokerIdentity { public void setSplitMetadataSize(int splitMetadataSize) { this.splitMetadataSize = splitMetadataSize; } + + public int getPopFromRetryProbabilityForPriority() { + return popFromRetryProbabilityForPriority; + } + + public void setPopFromRetryProbabilityForPriority(int popFromRetryProbabilityForPriority) { + this.popFromRetryProbabilityForPriority = popFromRetryProbabilityForPriority; + } + + public boolean isPriorityOrderAsc() { + return priorityOrderAsc; + } + + public void setPriorityOrderAsc(boolean priorityOrderAsc) { + this.priorityOrderAsc = priorityOrderAsc; + } + + public boolean isUseSeparateRetryQueue() { + return useSeparateRetryQueue; + } + + public void setUseSeparateRetryQueue(boolean useSeparateRetryQueue) { + this.useSeparateRetryQueue = useSeparateRetryQueue; + } } diff --git a/common/src/main/java/org/apache/rocketmq/common/SubscriptionGroupAttributes.java b/common/src/main/java/org/apache/rocketmq/common/SubscriptionGroupAttributes.java index 5b0072401c..845f407939 100644 --- a/common/src/main/java/org/apache/rocketmq/common/SubscriptionGroupAttributes.java +++ b/common/src/main/java/org/apache/rocketmq/common/SubscriptionGroupAttributes.java @@ -19,11 +19,20 @@ package org.apache.rocketmq.common; import java.util.HashMap; import java.util.Map; import org.apache.rocketmq.common.attribute.Attribute; +import org.apache.rocketmq.common.attribute.LongRangeAttribute; public class SubscriptionGroupAttributes { public static final Map<String, Attribute> ALL; + public static final LongRangeAttribute PRIORITY_FACTOR_ATTRIBUTE = new LongRangeAttribute( + "priority.factor", + true, + 0, // disable priority mode + 100, // enable priority mode + 100 + ); static { ALL = new HashMap<>(); + ALL.put(PRIORITY_FACTOR_ATTRIBUTE.getName(), PRIORITY_FACTOR_ATTRIBUTE); } } diff --git a/common/src/main/java/org/apache/rocketmq/common/attribute/TopicMessageType.java b/common/src/main/java/org/apache/rocketmq/common/attribute/TopicMessageType.java index 5e581a34ee..9a89d30e8f 100644 --- a/common/src/main/java/org/apache/rocketmq/common/attribute/TopicMessageType.java +++ b/common/src/main/java/org/apache/rocketmq/common/attribute/TopicMessageType.java @@ -28,6 +28,7 @@ public enum TopicMessageType { FIFO("FIFO"), DELAY("DELAY"), TRANSACTION("TRANSACTION"), + PRIORITY("PRIORITY"), MIXED("MIXED"); private final String value; @@ -36,7 +37,8 @@ public enum TopicMessageType { } public static Set<String> topicMessageTypeSet() { - return Sets.newHashSet(UNSPECIFIED.value, NORMAL.value, FIFO.value, DELAY.value, TRANSACTION.value, MIXED.value); + return Sets.newHashSet(UNSPECIFIED.value, NORMAL.value, FIFO.value, DELAY.value, TRANSACTION.value, + PRIORITY.value, MIXED.value); } public String getValue() { @@ -44,9 +46,8 @@ public enum TopicMessageType { } public static TopicMessageType parseFromMessageProperty(Map<String, String> messageProperty) { - String isTrans = messageProperty.get(MessageConst.PROPERTY_TRANSACTION_PREPARED); - String isTransValue = "true"; - if (isTransValue.equals(isTrans)) { + // the parse order keeps message types mutually exclusive + if (Boolean.parseBoolean(messageProperty.get(MessageConst.PROPERTY_TRANSACTION_PREPARED))) { return TopicMessageType.TRANSACTION; } else if (messageProperty.get(MessageConst.PROPERTY_DELAY_TIME_LEVEL) != null || messageProperty.get(MessageConst.PROPERTY_TIMER_DELIVER_MS) != null @@ -55,6 +56,8 @@ public enum TopicMessageType { return TopicMessageType.DELAY; } else if (messageProperty.get(MessageConst.PROPERTY_SHARDING_KEY) != null) { return TopicMessageType.FIFO; + } else if (messageProperty.get(MessageConst.PROPERTY_PRIORITY) != null) { + return TopicMessageType.PRIORITY; } return TopicMessageType.NORMAL; } diff --git a/common/src/main/java/org/apache/rocketmq/common/message/Message.java b/common/src/main/java/org/apache/rocketmq/common/message/Message.java index acd4df96d2..b64f3520c1 100644 --- a/common/src/main/java/org/apache/rocketmq/common/message/Message.java +++ b/common/src/main/java/org/apache/rocketmq/common/message/Message.java @@ -16,6 +16,8 @@ */ package org.apache.rocketmq.common.message; +import org.apache.commons.lang3.math.NumberUtils; + import java.io.Serializable; import java.util.Arrays; import java.util.Collection; @@ -154,6 +156,17 @@ public class Message implements Serializable { this.putProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL, String.valueOf(level)); } + public void setPriority(int priority) { + if (priority < 0) { + throw new IllegalArgumentException("The priority must be greater than or equal to 0"); + } + this.putProperty(MessageConst.PROPERTY_PRIORITY, String.valueOf(priority)); + } + + public int getPriority() { + return NumberUtils.toInt(this.getProperty(MessageConst.PROPERTY_PRIORITY), -1); + } + public boolean isWaitStoreMsgOK() { String result = this.getProperty(MessageConst.PROPERTY_WAIT_STORE_MSG_OK); if (null == result) { diff --git a/common/src/main/java/org/apache/rocketmq/common/message/MessageConst.java b/common/src/main/java/org/apache/rocketmq/common/message/MessageConst.java index 2bdaabebae..c88e57d696 100644 --- a/common/src/main/java/org/apache/rocketmq/common/message/MessageConst.java +++ b/common/src/main/java/org/apache/rocketmq/common/message/MessageConst.java @@ -43,6 +43,7 @@ public class MessageConst { public static final String PROPERTY_EXTEND_UNIQ_INFO = "EXTEND_UNIQ_INFO"; public static final String PROPERTY_MAX_RECONSUME_TIMES = "MAX_RECONSUME_TIMES"; public static final String PROPERTY_CONSUME_START_TIMESTAMP = "CONSUME_START_TIME"; + public static final String PROPERTY_PRIORITY = "_SYS_MSG_PRIORITY_"; public static final String PROPERTY_INNER_NUM = "INNER_NUM"; public static final String PROPERTY_INNER_BASE = "INNER_BASE"; public static final String DUP_INFO = "DUP_INFO"; @@ -159,5 +160,6 @@ public class MessageConst { STRING_HASH_SET.add(PROPERTY_DLQ_ORIGIN_TOPIC); STRING_HASH_SET.add(PROPERTY_DLQ_ORIGIN_MESSAGE_ID); STRING_HASH_SET.add(PROPERTY_CRC32); + STRING_HASH_SET.add(PROPERTY_PRIORITY); } } diff --git a/common/src/test/java/org/apache/rocketmq/common/attribute/TopicMessageTypeTest.java b/common/src/test/java/org/apache/rocketmq/common/attribute/TopicMessageTypeTest.java index 0321679ccc..79402ca1b2 100644 --- a/common/src/test/java/org/apache/rocketmq/common/attribute/TopicMessageTypeTest.java +++ b/common/src/test/java/org/apache/rocketmq/common/attribute/TopicMessageTypeTest.java @@ -33,6 +33,7 @@ public class TopicMessageTypeTest { private Map<String, String> transactionMessageProperty; private Map<String, String> delayMessageProperty; private Map<String, String> fifoMessageProperty; + private Map<String, String> priorityMessageProperty; @Before public void setUp() { @@ -40,15 +41,18 @@ public class TopicMessageTypeTest { transactionMessageProperty = new HashMap<>(); delayMessageProperty = new HashMap<>(); fifoMessageProperty = new HashMap<>(); + priorityMessageProperty = new HashMap<>(); transactionMessageProperty.put(MessageConst.PROPERTY_TRANSACTION_PREPARED, "true"); delayMessageProperty.put(MessageConst.PROPERTY_DELAY_TIME_LEVEL, "1"); fifoMessageProperty.put(MessageConst.PROPERTY_SHARDING_KEY, "shardingKey"); + priorityMessageProperty.put(MessageConst.PROPERTY_PRIORITY, "3"); } @Test public void testTopicMessageTypeSet() { - Set<String> expectedSet = Sets.newHashSet("UNSPECIFIED", "NORMAL", "FIFO", "DELAY", "TRANSACTION", "MIXED"); + Set<String> expectedSet + = Sets.newHashSet("UNSPECIFIED", "NORMAL", "FIFO", "DELAY", "TRANSACTION", "PRIORITY", "MIXED"); Set<String> actualSet = TopicMessageType.topicMessageTypeSet(); assertEquals(expectedSet, actualSet); } @@ -77,6 +81,12 @@ public class TopicMessageTypeTest { assertEquals(TopicMessageType.FIFO, actual); } + @Test + public void testParseFromMessageProperty_Priority() { + TopicMessageType actual = TopicMessageType.parseFromMessageProperty(priorityMessageProperty); + assertEquals(TopicMessageType.PRIORITY, actual); + } + @Test public void testGetMetricsValue() { for (TopicMessageType type : TopicMessageType.values()) { @@ -116,6 +126,13 @@ public class TopicMessageTypeTest { properties.put(MessageConst.PROPERTY_SHARDING_KEY, "sharding_key"); Assert.assertEquals(TopicMessageType.FIFO, TopicMessageType.parseFromMessageProperty(properties)); + // PRIORITY + properties.clear(); + properties.put(MessageConst.PROPERTY_PRIORITY, "3"); + Assert.assertEquals(TopicMessageType.PRIORITY, TopicMessageType.parseFromMessageProperty(properties)); + properties.put(MessageConst.PROPERTY_DELAY_TIME_LEVEL, "3"); + Assert.assertEquals(TopicMessageType.DELAY, TopicMessageType.parseFromMessageProperty(properties)); + // NORMAL properties.clear(); Assert.assertEquals(TopicMessageType.NORMAL, TopicMessageType.parseFromMessageProperty(properties)); diff --git a/pom.xml b/pom.xml index 56830f15a3..48934f6da6 100644 --- a/pom.xml +++ b/pom.xml @@ -127,7 +127,7 @@ <annotations-api.version>6.0.53</annotations-api.version> <extra-enforcer-rules.version>1.0-beta-4</extra-enforcer-rules.version> <concurrentlinkedhashmap-lru.version>1.4.2</concurrentlinkedhashmap-lru.version> - <rocketmq-proto.version>2.0.4</rocketmq-proto.version> + <rocketmq-proto.version>2.1.1</rocketmq-proto.version> <grpc.version>1.53.0</grpc.version> <protobuf.version>3.20.1</protobuf.version> <disruptor.version>1.2.10</disruptor.version> diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/common/GrpcConverter.java b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/common/GrpcConverter.java index 33a4e1312f..4ce3dc831d 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/common/GrpcConverter.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/common/GrpcConverter.java @@ -212,6 +212,12 @@ public class GrpcConverter { } } + // priority + int priority = messageExt.getPriority(); + if (priority >= 0) { + systemPropertiesBuilder.setPriority(priority); + } + // sharding key String shardingKey = messageExt.getProperty(MessageConst.PROPERTY_SHARDING_KEY); if (shardingKey != null) { diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/producer/SendMessageActivity.java b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/producer/SendMessageActivity.java index f7b8014bb9..2c3ffd1305 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/producer/SendMessageActivity.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/producer/SendMessageActivity.java @@ -262,6 +262,12 @@ public class SendMessageActivity extends AbstractMessingActivity { // set delay level or deliver timestamp fillDelayMessageProperty(message, messageWithHeader); + // set priority + if (message.getSystemProperties().hasPriority()) { + int priority = message.getSystemProperties().getPriority(); + messageWithHeader.setPriority(priority); + } + // set reconsume times int reconsumeTimes = message.getSystemProperties().getDeliveryAttempt(); MessageAccessor.setReconsumeTime(messageWithHeader, String.valueOf(reconsumeTimes)); diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/route/RouteActivity.java b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/route/RouteActivity.java index 20ae3aa6c8..92883aaa55 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/route/RouteActivity.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/route/RouteActivity.java @@ -308,6 +308,8 @@ public class RouteActivity extends AbstractMessingActivity { return Collections.singletonList(MessageType.TRANSACTION); case DELAY: return Collections.singletonList(MessageType.DELAY); + case PRIORITY: + return Collections.singletonList(MessageType.PRIORITY); case MIXED: return Arrays.asList(MessageType.NORMAL, MessageType.FIFO, MessageType.DELAY, MessageType.TRANSACTION); default: diff --git a/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/producer/SendMessageActivityTest.java b/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/producer/SendMessageActivityTest.java index 4882a5ed8b..a64867ddfe 100644 --- a/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/producer/SendMessageActivityTest.java +++ b/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/producer/SendMessageActivityTest.java @@ -268,6 +268,34 @@ public class SendMessageActivityTest extends BaseActivityTest { assertEquals(MessageSysFlag.TRANSACTION_PREPARED_TYPE | MessageSysFlag.COMPRESSED_FLAG, sendMessageActivity.buildSysFlag(message)); } + @Test + public void testPriorityMessage() { + String msgId = MessageClientIDSetter.createUniqID(); + Message message = Message.newBuilder() + .setTopic(Resource.newBuilder() + .setName(TOPIC) + .build()) + .setSystemProperties(SystemProperties.newBuilder() + .setMessageId(msgId) + .setQueueId(0) + .setMessageType(MessageType.PRIORITY) + .setPriority(5) + .setBodyEncoding(Encoding.GZIP) + .setBornTimestamp(Timestamps.fromMillis(System.currentTimeMillis())) + .setBornHost(StringUtils.defaultString(NetworkUtil.getLocalAddress(), "127.0.0.1:1234")) + .build()) + .setBody(ByteString.copyFromUtf8("123")) + .build(); + org.apache.rocketmq.common.message.Message messageExt = this.sendMessageActivity.buildMessage(null, + Lists.newArrayList( + message + ), + Resource.newBuilder().setName(TOPIC).build()).get(0); + + assertEquals(MessageClientIDSetter.getUniqID(messageExt), msgId); + assertEquals(5, messageExt.getPriority()); + } + @Test public void testSendOrderMessageQueueSelector() throws Exception { TopicRouteData topicRouteData = new TopicRouteData(); diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/subscription/SubscriptionGroupConfig.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/subscription/SubscriptionGroupConfig.java index c9c2a8090c..2c3738a464 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/subscription/SubscriptionGroupConfig.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/subscription/SubscriptionGroupConfig.java @@ -17,13 +17,17 @@ package org.apache.rocketmq.remoting.protocol.subscription; +import com.alibaba.fastjson2.annotation.JSONField; import com.google.common.base.MoreObjects; import java.util.HashMap; import java.util.Map; import java.util.Set; import org.apache.commons.lang3.builder.EqualsBuilder; +import org.apache.commons.lang3.math.NumberUtils; import org.apache.rocketmq.common.MixAll; +import static org.apache.rocketmq.common.SubscriptionGroupAttributes.PRIORITY_FACTOR_ATTRIBUTE; + public class SubscriptionGroupConfig { private String groupName; @@ -173,6 +177,12 @@ public class SubscriptionGroupConfig { this.attributes = attributes; } + @JSONField(serialize = false, deserialize = false) + public long getPriorityFactor() { + String factorStr = null == attributes ? null : attributes.get(PRIORITY_FACTOR_ATTRIBUTE.getName()); + return NumberUtils.toLong(factorStr, PRIORITY_FACTOR_ATTRIBUTE.getDefaultValue()); + } + @Override public int hashCode() { final int prime = 31; diff --git a/test/src/main/java/org/apache/rocketmq/test/client/rmq/RMQNormalProducer.java b/test/src/main/java/org/apache/rocketmq/test/client/rmq/RMQNormalProducer.java index 7df189a915..75444d3a1f 100644 --- a/test/src/main/java/org/apache/rocketmq/test/client/rmq/RMQNormalProducer.java +++ b/test/src/main/java/org/apache/rocketmq/test/client/rmq/RMQNormalProducer.java @@ -110,6 +110,7 @@ public class RMQNormalProducer extends AbstractMQProducer { msgBodys.addData(new String(message.getBody(), StandardCharsets.UTF_8)); originMsgs.addData(msg); originMsgIndex.put(new String(message.getBody(), StandardCharsets.UTF_8), internalSendResult); + sendResult.setSendResultObj(internalSendResult); } catch (Exception e) { if (isDebug) { e.printStackTrace(); diff --git a/test/src/main/java/org/apache/rocketmq/test/sendresult/ResultWrapper.java b/test/src/main/java/org/apache/rocketmq/test/sendresult/ResultWrapper.java index 9fe31463e4..d9a5987ff4 100644 --- a/test/src/main/java/org/apache/rocketmq/test/sendresult/ResultWrapper.java +++ b/test/src/main/java/org/apache/rocketmq/test/sendresult/ResultWrapper.java @@ -17,11 +17,14 @@ package org.apache.rocketmq.test.sendresult; +import org.apache.rocketmq.client.producer.SendResult; + public class ResultWrapper { private boolean sendResult = false; private String msgId = null; private Exception sendException = null; private String brokerIp = null; + private SendResult sendResultObj = null; public String getBrokerIp() { return brokerIp; @@ -55,6 +58,13 @@ public class ResultWrapper { this.sendException = sendException; } + public SendResult getSendResultObj() { + return sendResultObj; + } + public void setSendResultObj(SendResult sendResultObj) { + this.sendResultObj = sendResultObj; + } + @Override public String toString() { return String.format("sendstatus:%s msgId:%s", sendResult, msgId); diff --git a/test/src/main/java/org/apache/rocketmq/test/util/MQAdminTestUtils.java b/test/src/main/java/org/apache/rocketmq/test/util/MQAdminTestUtils.java index 276d08d806..3b6154ae6b 100644 --- a/test/src/main/java/org/apache/rocketmq/test/util/MQAdminTestUtils.java +++ b/test/src/main/java/org/apache/rocketmq/test/util/MQAdminTestUtils.java @@ -104,12 +104,10 @@ public class MQAdminTestUtils { return createResult; } - public static boolean createSub(String nameSrvAddr, String clusterName, String consumerId) { + public static boolean createSub(String nameSrvAddr, String clusterName, SubscriptionGroupConfig config) { boolean createResult = true; DefaultMQAdminExt mqAdminExt = new DefaultMQAdminExt(); mqAdminExt.setNamesrvAddr(nameSrvAddr); - SubscriptionGroupConfig config = new SubscriptionGroupConfig(); - config.setGroupName(consumerId); try { mqAdminExt.start(); Set<String> masterSet = CommandUtil.fetchMasterAddrByClusterName(mqAdminExt, @@ -117,7 +115,7 @@ public class MQAdminTestUtils { for (String addr : masterSet) { try { mqAdminExt.createAndUpdateSubscriptionGroupConfig(addr, config); - log.info("create subscription group {} to {} success.", consumerId, addr); + log.info("create subscription group {} to {} success.", config.getGroupName(), addr); } catch (Exception e) { e.printStackTrace(); Thread.sleep(1000 * 1); diff --git a/test/src/test/java/org/apache/rocketmq/test/base/BaseConf.java b/test/src/test/java/org/apache/rocketmq/test/base/BaseConf.java index 472e106ce3..50741ba091 100644 --- a/test/src/test/java/org/apache/rocketmq/test/base/BaseConf.java +++ b/test/src/test/java/org/apache/rocketmq/test/base/BaseConf.java @@ -40,6 +40,7 @@ import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; import org.apache.rocketmq.namesrv.NamesrvController; import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.remoting.protocol.route.BrokerData; +import org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig; import org.apache.rocketmq.test.client.rmq.RMQAsyncSendProducer; import org.apache.rocketmq.test.client.rmq.RMQNormalConsumer; import org.apache.rocketmq.test.client.rmq.RMQNormalProducer; @@ -203,15 +204,21 @@ public class BaseConf { } public static String initConsumerGroup() { - String group = MQRandomUtils.getRandomConsumerGroup(); - return initConsumerGroup(group); + return initConsumerGroup(MQRandomUtils.getRandomConsumerGroup()); } public static String initConsumerGroup(String group) { - MQAdminTestUtils.createSub(NAMESRV_ADDR, CLUSTER_NAME, group); + SubscriptionGroupConfig config = new SubscriptionGroupConfig(); + config.setGroupName(group); + MQAdminTestUtils.createSub(NAMESRV_ADDR, CLUSTER_NAME, config); return group; } + public static String initConsumerGroup(SubscriptionGroupConfig config) { + MQAdminTestUtils.createSub(NAMESRV_ADDR, CLUSTER_NAME, config); + return config.getGroupName(); + } + public static DefaultMQAdminExt getAdmin(String nsAddr) { final DefaultMQAdminExt mqAdminExt = new DefaultMQAdminExt(3 * 1000); mqAdminExt.setNamesrvAddr(nsAddr); diff --git a/test/src/test/java/org/apache/rocketmq/test/base/IntegrationTestBase.java b/test/src/test/java/org/apache/rocketmq/test/base/IntegrationTestBase.java index 287e54d561..cfcb989649 100644 --- a/test/src/test/java/org/apache/rocketmq/test/base/IntegrationTestBase.java +++ b/test/src/test/java/org/apache/rocketmq/test/base/IntegrationTestBase.java @@ -141,6 +141,7 @@ public class IntegrationTestBase { brokerConfig.setRecallMessageEnable(true); storeConfig.setEnableConsumeQueueExt(true); brokerConfig.setLoadBalancePollNameServerInterval(500); + brokerConfig.setPopConsumerKVServiceInit(true); storeConfig.setStorePathRootDir(baseDir); storeConfig.setStorePathCommitLog(baseDir + SEP + "commitlog"); storeConfig.setMappedFileSizeCommitLog(commitLogSize); diff --git a/test/src/test/java/org/apache/rocketmq/test/client/consumer/pop/BasePopNormally.java b/test/src/test/java/org/apache/rocketmq/test/client/consumer/pop/BasePopNormally.java index 2e29b95a5a..d4a1b3be5a 100644 --- a/test/src/test/java/org/apache/rocketmq/test/client/consumer/pop/BasePopNormally.java +++ b/test/src/test/java/org/apache/rocketmq/test/client/consumer/pop/BasePopNormally.java @@ -18,11 +18,15 @@ package org.apache.rocketmq.test.client.consumer.pop; import java.util.concurrent.CompletableFuture; + +import org.apache.rocketmq.client.consumer.AckResult; import org.apache.rocketmq.client.consumer.PopResult; import org.apache.rocketmq.common.attribute.CQType; import org.apache.rocketmq.common.attribute.TopicMessageType; import org.apache.rocketmq.common.constant.ConsumeInitMode; import org.apache.rocketmq.common.filter.ExpressionType; +import org.apache.rocketmq.common.message.MessageConst; +import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.test.base.IntegrationTestBase; import org.apache.rocketmq.test.client.rmq.RMQNormalProducer; @@ -69,4 +73,8 @@ public class BasePopNormally extends BasePop { brokerAddr, messageQueue, invisibleTime, maxNums, group, 3000, false, ConsumeInitMode.MIN, false, ExpressionType.TAG, "*"); } + + protected CompletableFuture<AckResult> ackMessageAsync(MessageExt messageExt) { + return client.ackMessageAsync(brokerAddr, topic, group, messageExt.getProperty(MessageConst.PROPERTY_POP_CK)); + } } diff --git a/test/src/test/java/org/apache/rocketmq/test/client/consumer/pop/PopPriorityIT.java b/test/src/test/java/org/apache/rocketmq/test/client/consumer/pop/PopPriorityIT.java new file mode 100644 index 0000000000..98f7ae55bd --- /dev/null +++ b/test/src/test/java/org/apache/rocketmq/test/client/consumer/pop/PopPriorityIT.java @@ -0,0 +1,319 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.test.client.consumer.pop; + +import org.apache.rocketmq.client.consumer.PopResult; +import org.apache.rocketmq.client.consumer.PopStatus; +import org.apache.rocketmq.client.producer.SendResult; +import org.apache.rocketmq.common.attribute.AttributeParser; +import org.apache.rocketmq.common.attribute.CQType; +import org.apache.rocketmq.common.attribute.TopicMessageType; +import org.apache.rocketmq.common.message.Message; +import org.apache.rocketmq.common.message.MessageExt; +import org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig; +import org.apache.rocketmq.test.base.IntegrationTestBase; +import org.apache.rocketmq.test.util.TestUtil; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Random; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +import static org.apache.rocketmq.common.SubscriptionGroupAttributes.PRIORITY_FACTOR_ATTRIBUTE; +import static org.awaitility.Awaitility.await; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +@RunWith(Parameterized.class) +public class PopPriorityIT extends BasePopNormally { + + private final boolean popConsumerKVServiceEnable; + private final boolean priorityOrderAsc; + private int writeQueueNum = 8; + + public PopPriorityIT(boolean popConsumerKVServiceEnable, boolean priorityOrderAsc) { + this.popConsumerKVServiceEnable = popConsumerKVServiceEnable; + this.priorityOrderAsc = priorityOrderAsc; + } + + @Parameterized.Parameters + public static List<Object[]> params() { + List<Object[]> result = new ArrayList<>(); + result.add(new Object[] {false, true}); + result.add(new Object[] {false, false}); + result.add(new Object[] {true, true}); + result.add(new Object[] {true, false}); + return result; + } + + @Before + public void setUp() { + super.setUp(); + // reset default config if changed + writeQueueNum = 8; + brokerController1.getBrokerConfig().setPopFromRetryProbabilityForPriority(0); + brokerController1.getBrokerConfig().setUseSeparateRetryQueue(false); + brokerController1.getBrokerConfig().setPopConsumerKVServiceEnable(popConsumerKVServiceEnable); + brokerController1.getBrokerConfig().setPriorityOrderAsc(priorityOrderAsc); + IntegrationTestBase.initTopic(topic, NAMESRV_ADDR, BROKER1_NAME, writeQueueNum, CQType.SimpleCQ, TopicMessageType.PRIORITY); + } + + @After + public void tearDown() { + super.tearDown(); + } + + @Test + public void test_normal_send() { + int priority = -1; // normal message + Set<Integer> queueIdSet = new HashSet<>(); + for (int i = 0; i < 32; i++) { + Message message = mockMessage(topic, priority, ""); + SendResult sendResult = producer.send(message, null).getSendResultObj(); + queueIdSet.add(sendResult.getMessageQueue().getQueueId()); + } + assertTrue(queueIdSet.size() > 1); + } + + @Test + public void test_priority_send() { + final int priority = 0; // priority message + for (int i = 0; i < 32; i++) { + Message message = mockMessage(topic, priority, ""); + SendResult sendResult = producer.send(message, null).getSendResultObj(); + assertEquals(priority, sendResult.getMessageQueue().getQueueId()); + } + } + + @Test + public void test_priority_consume_always_high_priority() throws Exception { + int msgNumPerQueue = 20; + final int maxPriority = priorityOrderAsc ? writeQueueNum - 1 : 0; + for (int i = 0; i < writeQueueNum; i++) { + Message message = mockMessage(topic, i, String.valueOf(i)); + for (int j = 0; j < msgNumPerQueue; j++) { + producer.send(message); + } + } + Assert.assertTrue(awaitDispatchMs(2000)); + for (int i = 0; i < msgNumPerQueue; i++) { + PopResult popResult = popMessageAsync(Duration.ofSeconds(600).toMillis(), 1, 30000).get(); + TestUtil.waitForMonment(20); // wait lock release + assertEquals(PopStatus.FOUND, popResult.getPopStatus()); + MessageExt message = popResult.getMsgFoundList().get(0); + assertEquals(maxPriority, message.getPriority()); // not a coincidence + } + } + + @Test + public void test_priority_consume_from_high_to_low() throws Exception { + for (int i = 0; i < writeQueueNum; i++) { + Message message = mockMessage(topic, i, String.valueOf(i)); + producer.send(message); + } + Assert.assertTrue(awaitDispatchMs(2000)); + for (int i = 0; i < writeQueueNum; i++) { + PopResult popResult = popMessageAsync(Duration.ofSeconds(30).toMillis(), 1, 30000).get(); + assertEquals(PopStatus.FOUND, popResult.getPopStatus()); + MessageExt message = popResult.getMsgFoundList().get(0); + int expectPriority = priorityOrderAsc ? writeQueueNum - 1 - i : i; + assertEquals(0, message.getQueueOffset()); + assertEquals(expectPriority, message.getQueueId()); + assertEquals(expectPriority, message.getPriority()); + } + } + + @Test + public void test_priority_consume_disable() throws Exception { + SubscriptionGroupConfig config = new SubscriptionGroupConfig(); + config.setGroupName(group); + config.setAttributes(AttributeParser.parseToMap("+" + PRIORITY_FACTOR_ATTRIBUTE.getName() + "=0")); + initConsumerGroup(config); + + int msgNumPerQueue = 200; + for (int i = 0; i < writeQueueNum; i++) { + Message message = mockMessage(topic, i, String.valueOf(i)); + for (int j = 0; j < msgNumPerQueue; j++) { + producer.send(message); + } + } + Assert.assertTrue(awaitDispatchMs(2000)); + int sampleCount = 800; + int[] queueIdCount = new int[writeQueueNum]; + for (int i = 0; i < sampleCount; i++) { + PopResult popResult = popMessageAsync(Duration.ofSeconds(600).toMillis(), 1, 30000).get(); + TestUtil.waitForMonment(10); // wait lock release + assertEquals(PopStatus.FOUND, popResult.getPopStatus()); + MessageExt message = popResult.getMsgFoundList().get(0); + queueIdCount[message.getQueueId()] = queueIdCount[message.getQueueId()] + 1; + } + + double expectAverage = (double) sampleCount / writeQueueNum; + for (int count : queueIdCount) { + assertTrue(Math.abs(count - expectAverage) < expectAverage * 0.4); + } + } + + @Test + public void test_priority_consume_retry_as_lowest() throws Exception { + // retry as lowest by default + int count = 100; + for (int i = 0; i < count; i++) { + Message message = mockMessage(topic, new Random().nextInt(writeQueueNum), String.valueOf(i)); + producer.send(message); + } + int invisibleTime = 3; + PopResult popResult = popMessageAsync(Duration.ofSeconds(invisibleTime).toMillis(), 1, 30000).get(); + assertEquals(PopStatus.FOUND, popResult.getPopStatus()); + String retryId = popResult.getMsgFoundList().get(0).getMsgId(); + TestUtil.waitForSeconds(invisibleTime + 3); + Assert.assertTrue(awaitDispatchMs(2000)); + + List<MessageExt> collect = new ArrayList<>(); + await() + .pollInterval(1, TimeUnit.SECONDS) + .atMost(35, TimeUnit.SECONDS) + .until(() -> { + PopResult result = popMessageAsync(Duration.ofSeconds(600).toMillis(), 32, 5000).get(); + if (PopStatus.FOUND.equals(result.getPopStatus())) { + collect.addAll(result.getMsgFoundList()); + return false; + } + return true; + }); + + assertEquals(count, collect.size()); + assertEquals(1, collect.get(collect.size() - 1).getReconsumeTimes()); + assertEquals(retryId, collect.get(collect.size() - 1).getMsgId()); + } + + @Test + public void test_priority_consume_retry_as_highest() throws Exception { + brokerController1.getBrokerConfig().setPopFromRetryProbabilityForPriority(100); + int count = 100; + for (int i = 0; i < count; i++) { + Message message = mockMessage(topic, new Random().nextInt(writeQueueNum), String.valueOf(i)); + producer.send(message); + } + int invisibleTime = 3; + PopResult popResult = popMessageAsync(Duration.ofSeconds(invisibleTime).toMillis(), 1, 30000).get(); + assertEquals(PopStatus.FOUND, popResult.getPopStatus()); + String retryId = popResult.getMsgFoundList().get(0).getMsgId(); + TestUtil.waitForSeconds(invisibleTime + 3); + Assert.assertTrue(awaitDispatchMs(2000)); + + List<MessageExt> collect = new ArrayList<>(); + await() + .pollInterval(1, TimeUnit.SECONDS) + .atMost(35, TimeUnit.SECONDS) + .until(() -> { + PopResult result = popMessageAsync(Duration.ofSeconds(600).toMillis(), 32, 5000).get(); + if (PopStatus.FOUND.equals(result.getPopStatus())) { + collect.addAll(result.getMsgFoundList()); + return false; + } + return true; + }); + + assertEquals(count, collect.size()); + assertEquals(1, collect.get(0).getReconsumeTimes()); + assertEquals(retryId, collect.get(0).getMsgId()); + } + + @Test + public void test_priority_consume_use_separate_retry_queue() throws Exception { + brokerController1.getBrokerConfig().setUseSeparateRetryQueue(true); + brokerController1.getBrokerConfig().setPopFromRetryProbabilityForPriority(100); + for (int i = 0; i < writeQueueNum; i++) { + Message message = mockMessage(topic, i, String.valueOf(i)); + producer.send(message); + } + Assert.assertTrue(awaitDispatchMs(2000)); + int invisibleTime = 3; + PopResult popResult = popMessageAsync(Duration.ofSeconds(invisibleTime).toMillis(), writeQueueNum, 30000).get(); + assertEquals(PopStatus.FOUND, popResult.getPopStatus()); + assertEquals(writeQueueNum, popResult.getMsgFoundList().size()); + TestUtil.waitForSeconds(invisibleTime + 3); + + popResult = popMessageAsync(Duration.ofSeconds(600).toMillis(), 32, 10000).get(); + assertEquals(PopStatus.FOUND, popResult.getPopStatus()); + assertEquals(writeQueueNum, popResult.getMsgFoundList().size()); + for (int i = 0; i < writeQueueNum; i++) { + MessageExt message = popResult.getMsgFoundList().get(i); + assertEquals(0, message.getQueueOffset()); // means a separate retry queue + assertEquals(1, message.getReconsumeTimes()); + int expectPriority = priorityOrderAsc ? writeQueueNum - 1 - i : i; + assertEquals(expectPriority, message.getQueueId()); + assertEquals(expectPriority, message.getPriority()); + } + } + + @Test + public void test_priority_consume_use_separate_retry_queue_with_queue_expansion() throws Exception { + // retry as lowest by default + brokerController1.getBrokerConfig().setUseSeparateRetryQueue(true); + for (int i = 0; i < writeQueueNum; i++) { + Message message = mockMessage(topic, i, String.valueOf(i)); + producer.send(message); + } + Assert.assertTrue(awaitDispatchMs(2000)); + int invisibleTime = 3; + PopResult popResult = popMessageAsync(Duration.ofSeconds(invisibleTime).toMillis(), writeQueueNum, 30000).get(); + assertEquals(PopStatus.FOUND, popResult.getPopStatus()); + assertEquals(writeQueueNum, popResult.getMsgFoundList().size()); + TestUtil.waitForSeconds(invisibleTime + 3); // wait retry created + + writeQueueNum = writeQueueNum * 2; + IntegrationTestBase.initTopic(topic, NAMESRV_ADDR, BROKER1_NAME, writeQueueNum, CQType.SimpleCQ, TopicMessageType.PRIORITY); + for (int i = writeQueueNum / 2; i < writeQueueNum; i++) { + Message message = mockMessage(topic, i, String.valueOf(i)); + producer.send(message); + } + Assert.assertTrue(awaitDispatchMs(2000)); + + popResult = popMessageAsync(Duration.ofSeconds(invisibleTime).toMillis(), 32, 5000).get(); + List<MessageExt> msgList = popResult.getMsgFoundList(); + // asc == true, collect: [15 -> 8, 7 -> 0] + // asc == false, collect: [8 -> 15, 0 -> 7] + assertEquals(writeQueueNum, msgList.size()); + assertEquals(priorityOrderAsc ? writeQueueNum - 1 : writeQueueNum / 2, msgList.get(0).getQueueId()); + assertEquals(priorityOrderAsc ? writeQueueNum - 1 : writeQueueNum / 2, msgList.get(0).getPriority()); + assertEquals(priorityOrderAsc ? 0 : writeQueueNum / 2 - 1, msgList.get(msgList.size() - 1).getQueueId()); + assertEquals(priorityOrderAsc ? 0 : writeQueueNum / 2 - 1, msgList.get(msgList.size() - 1).getPriority()); + assertEquals(1, msgList.get(msgList.size() - 1).getReconsumeTimes()); + assertEquals(0, msgList.get(msgList.size() - 1).getQueueOffset()); // means a separate retry queue + } + + private static Message mockMessage(String topic, int priority, String key) { + Message msg = new Message(topic, "HW".getBytes()); + if (priority >= 0) { + msg.setPriority(priority); + } + msg.setKeys(key); + return msg; + } +} diff --git a/test/src/test/java/org/apache/rocketmq/test/offset/OffsetResetForPopIT.java b/test/src/test/java/org/apache/rocketmq/test/offset/OffsetResetForPopIT.java index b9798cfd5a..b2092db96a 100644 --- a/test/src/test/java/org/apache/rocketmq/test/offset/OffsetResetForPopIT.java +++ b/test/src/test/java/org/apache/rocketmq/test/offset/OffsetResetForPopIT.java @@ -63,6 +63,7 @@ public class OffsetResetForPopIT extends BaseConf { public void setUp() throws Exception { // reset pop offset rely on server side offset brokerController1.getBrokerConfig().setUseServerSideResetOffset(true); + brokerController1.getBrokerConfig().setPopConsumerKVServiceEnable(false); // force disable before fifo resetOffset issue fixed adminExt = BaseConf.getAdmin(NAMESRV_ADDR); adminExt.start();
