This is an automated email from the ASF dual-hosted git repository. jinrongtong pushed a commit to branch 5.0.0-alpha in repository https://gitbox.apache.org/repos/asf/rocketmq.git
commit 0b4adb49b48b5addc0eafc4ef7412f9f72a9fbfa Author: zhangyang21 <[email protected]> AuthorDate: Mon Nov 22 18:31:13 2021 +0800 [Assignment] Fix the risk of memory overflow caused by excessive popShareQueueNum. Signed-off-by: zhangyang21 <[email protected]> --- .../broker/processor/QueryAssignmentProcessor.java | 65 +++++++++++-------- .../processor/QueryAssignmentProcessorTest.java | 75 ++++++++++++++++++++++ 2 files changed, 113 insertions(+), 27 deletions(-) diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/QueryAssignmentProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/QueryAssignmentProcessor.java index fdc320d..6fe9210 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/QueryAssignmentProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/QueryAssignmentProcessor.java @@ -208,33 +208,8 @@ public class QueryAssignmentProcessor implements NettyRequestProcessor { } if (setMessageRequestModeRequestBody != null && setMessageRequestModeRequestBody.getMode() == MessageRequestMode.POP) { - if (setMessageRequestModeRequestBody.getPopShareQueueNum() <= 0) { - //each client pop all messagequeue - allocateResult = new ArrayList<>(mqAll.size()); - for (MessageQueue mq : mqAll) { - //must create new MessageQueue in case of change cache in AssignmentManager - MessageQueue newMq = new MessageQueue(mq.getTopic(), mq.getBrokerName(), -1); - allocateResult.add(newMq); - } - - } else { - if (cidAll.size() <= mqAll.size()) { - //consumer working in pop mode could share the MessageQueues assigned to the N (N = popWorkGroupSize) consumer following it in the cid list - allocateResult = allocateMessageQueueStrategy.allocate(consumerGroup, clientId, mqAll, cidAll); - int index = cidAll.indexOf(clientId); - if (index >= 0) { - for (int i = 1; i <= setMessageRequestModeRequestBody.getPopShareQueueNum(); i++) { - index++; - index = index % cidAll.size(); - List<MessageQueue> tmp = allocateMessageQueueStrategy.allocate(consumerGroup, cidAll.get(index), mqAll, cidAll); - allocateResult.addAll(tmp); - } - } - } else { - //make sure each cid is assigned - allocateResult = allocate(consumerGroup, clientId, mqAll, cidAll); - } - } + allocateResult = allocate4Pop(allocateMessageQueueStrategy, consumerGroup, clientId, mqAll, + cidAll, setMessageRequestModeRequestBody.getPopShareQueueNum()); } else { allocateResult = allocateMessageQueueStrategy.allocate(consumerGroup, clientId, mqAll, cidAll); @@ -256,6 +231,42 @@ public class QueryAssignmentProcessor implements NettyRequestProcessor { return assignedQueueSet; } + public List<MessageQueue> allocate4Pop(AllocateMessageQueueStrategy allocateMessageQueueStrategy, + final String consumerGroup, final String clientId, List<MessageQueue> mqAll, List<String> cidAll, + int popShareQueueNum) { + + List<MessageQueue> allocateResult; + if (popShareQueueNum <= 0 || popShareQueueNum >= cidAll.size() - 1) { + //each client pop all messagequeue + allocateResult = new ArrayList<>(mqAll.size()); + for (MessageQueue mq : mqAll) { + //must create new MessageQueue in case of change cache in AssignmentManager + MessageQueue newMq = new MessageQueue(mq.getTopic(), mq.getBrokerName(), -1); + allocateResult.add(newMq); + } + + } else { + if (cidAll.size() <= mqAll.size()) { + //consumer working in pop mode could share the MessageQueues assigned to the N (N = popWorkGroupSize) consumer following it in the cid list + allocateResult = allocateMessageQueueStrategy.allocate(consumerGroup, clientId, mqAll, cidAll); + int index = cidAll.indexOf(clientId); + if (index >= 0) { + for (int i = 1; i <= popShareQueueNum; i++) { + index++; + index = index % cidAll.size(); + List<MessageQueue> tmp = allocateMessageQueueStrategy.allocate(consumerGroup, cidAll.get(index), mqAll, cidAll); + allocateResult.addAll(tmp); + } + } + } else { + //make sure each cid is assigned + allocateResult = allocate(consumerGroup, clientId, mqAll, cidAll); + } + } + + return allocateResult; + } + private List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll, List<String> cidAll) { if (currentCID == null || currentCID.length() < 1) { diff --git a/broker/src/test/java/org/apache/rocketmq/broker/processor/QueryAssignmentProcessorTest.java b/broker/src/test/java/org/apache/rocketmq/broker/processor/QueryAssignmentProcessorTest.java index 681fcc3..b16533b 100644 --- a/broker/src/test/java/org/apache/rocketmq/broker/processor/QueryAssignmentProcessorTest.java +++ b/broker/src/test/java/org/apache/rocketmq/broker/processor/QueryAssignmentProcessorTest.java @@ -19,9 +19,15 @@ package org.apache.rocketmq.broker.processor; import com.google.common.collect.ImmutableSet; import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; +import java.util.ArrayList; +import java.util.List; import org.apache.rocketmq.broker.BrokerController; import org.apache.rocketmq.broker.client.ClientChannelInfo; import org.apache.rocketmq.broker.loadbalance.AssignmentManager; +import org.apache.rocketmq.client.consumer.AllocateMessageQueueStrategy; +import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely; +import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragelyByCircle; +import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueConsistentHash; import org.apache.rocketmq.common.BrokerConfig; import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.TopicConfig; @@ -40,6 +46,7 @@ import org.apache.rocketmq.remoting.protocol.LanguageCode; import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.store.MessageStore; import org.apache.rocketmq.store.config.MessageStoreConfig; +import org.junit.Assert; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; @@ -67,6 +74,7 @@ public class QueryAssignmentProcessorTest { @Mock private Channel channel; + private String broker = "defaultBroker"; private String topic = "FooBar"; private String group = "FooBarGroup"; private String clientId = "127.0.0.1"; @@ -118,6 +126,73 @@ public class QueryAssignmentProcessorTest { assertThat(responseToReturn.getCode()).isEqualTo(ResponseCode.NO_PERMISSION); } + + @Test + public void testAllocate4Pop() { + testAllocate4Pop(new AllocateMessageQueueAveragely()); + testAllocate4Pop(new AllocateMessageQueueAveragelyByCircle()); + testAllocate4Pop(new AllocateMessageQueueConsistentHash()); + } + + private void testAllocate4Pop(AllocateMessageQueueStrategy strategy) { + int testNum = 16; + List<MessageQueue> mqAll = new ArrayList<>(); + for (int mqSize = 0; mqSize < testNum; mqSize++) { + mqAll.add(new MessageQueue(topic, broker, mqSize)); + + List<String> cidAll = new ArrayList<>(); + for (int cidSize = 0; cidSize < testNum; cidSize++) { + String clientId = String.valueOf(cidSize); + cidAll.add(clientId); + + for (int popShareQueueNum = 0; popShareQueueNum < testNum; popShareQueueNum++) { + List<MessageQueue> allocateResult = + queryAssignmentProcessor.allocate4Pop(strategy, group, clientId, mqAll, cidAll, popShareQueueNum); + Assert.assertTrue(checkAllocateResult(popShareQueueNum, mqAll.size(), cidAll.size(), allocateResult.size(), strategy)); + } + } + } + } + + private boolean checkAllocateResult(int popShareQueueNum, int mqSize, int cidSize, int allocateSize, + AllocateMessageQueueStrategy strategy) { + + //The maximum size of allocations will not exceed mqSize. + if (allocateSize > mqSize) { + return false; + } + + //It is not allowed that the client is not assigned to the consumeQueue. + if (allocateSize <= 0) { + return false; + } + + if (popShareQueueNum <= 0 || popShareQueueNum >= cidSize - 1) { + return allocateSize == mqSize; + } else if (mqSize < cidSize) { + return allocateSize == 1; + } + + if (strategy instanceof AllocateMessageQueueAveragely + || strategy instanceof AllocateMessageQueueAveragelyByCircle) { + + if (mqSize % cidSize == 0) { + return allocateSize == (mqSize / cidSize) * (popShareQueueNum + 1); + } else { + int avgSize = mqSize / cidSize; + return allocateSize >= avgSize * (popShareQueueNum + 1) + && allocateSize <= (avgSize + 1) * (popShareQueueNum + 1); + } + } + + if (strategy instanceof AllocateMessageQueueConsistentHash) { + //Just skip + return true; + } + + return false; + } + private RemotingCommand createQueryAssignmentRequest() { QueryAssignmentRequestBody requestBody = new QueryAssignmentRequestBody(); requestBody.setTopic(topic);
