This is an automated email from the ASF dual-hosted git repository.
jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new fb3b87da1b [ISSUE #8984] Fix the broker switch enableMixedMessageType
doesn't work
fb3b87da1b is described below
commit fb3b87da1bb3337039cc80d7a3fcf2dff4bd6ce3
Author: Liu Shengzhong <[email protected]>
AuthorDate: Mon Dec 2 10:01:51 2024 +0800
[ISSUE #8984] Fix the broker switch enableMixedMessageType doesn't work
---
.../broker/processor/AdminBrokerProcessor.java | 29 +++++++++------
.../broker/processor/AdminBrokerProcessorTest.java | 42 ++++++++++++++++++++++
2 files changed, 61 insertions(+), 10 deletions(-)
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
index cc70e69a46..fc3b618273 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
@@ -76,6 +76,7 @@ import org.apache.rocketmq.common.MQVersion;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.Pair;
import org.apache.rocketmq.common.PlainAccessConfig;
+import org.apache.rocketmq.common.TopicAttributes;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.UnlockCallback;
import org.apache.rocketmq.common.UtilAll;
@@ -534,11 +535,15 @@ public class AdminBrokerProcessor implements
NettyRequestProcessor {
String attributesModification = requestHeader.getAttributes();
topicConfig.setAttributes(AttributeParser.parseToMap(attributesModification));
- if (topicConfig.getTopicMessageType() == TopicMessageType.MIXED
- &&
!brokerController.getBrokerConfig().isEnableMixedMessageType()) {
- response.setCode(ResponseCode.SYSTEM_ERROR);
- response.setRemark("MIXED message type is not supported.");
- return response;
+ if (!brokerController.getBrokerConfig().isEnableMixedMessageType()
&& topicConfig.getAttributes() != null) {
+ // Get attribute by key with prefix sign
+ String msgTypeAttrKey = AttributeParser.ATTR_ADD_PLUS_SIGN +
TopicAttributes.TOPIC_MESSAGE_TYPE_ATTRIBUTE.getName();
+ String msgTypeAttrValue =
topicConfig.getAttributes().get(msgTypeAttrKey);
+ if (msgTypeAttrValue != null &&
msgTypeAttrValue.equals(TopicMessageType.MIXED.getValue())) {
+ response.setCode(ResponseCode.SYSTEM_ERROR);
+ response.setRemark("MIXED message type is not supported.");
+ return response;
+ }
}
if
(topicConfig.equals(this.brokerController.getTopicConfigManager().getTopicConfigTable().get(topic)))
{
@@ -609,11 +614,15 @@ public class AdminBrokerProcessor implements
NettyRequestProcessor {
return response;
}
}
- if (topicConfig.getTopicMessageType() == TopicMessageType.MIXED
- &&
!brokerController.getBrokerConfig().isEnableMixedMessageType()) {
- response.setCode(ResponseCode.SYSTEM_ERROR);
- response.setRemark("MIXED message type is not supported.");
- return response;
+ if
(!brokerController.getBrokerConfig().isEnableMixedMessageType() &&
topicConfig.getAttributes() != null) {
+ // Get attribute by key with prefix sign
+ String msgTypeAttrKey = AttributeParser.ATTR_ADD_PLUS_SIGN
+ TopicAttributes.TOPIC_MESSAGE_TYPE_ATTRIBUTE.getName();
+ String msgTypeAttrValue =
topicConfig.getAttributes().get(msgTypeAttrKey);
+ if (msgTypeAttrValue != null &&
msgTypeAttrValue.equals(TopicMessageType.MIXED.getValue())) {
+ response.setCode(ResponseCode.SYSTEM_ERROR);
+ response.setRemark("MIXED message type is not
supported.");
+ return response;
+ }
}
if
(topicConfig.equals(this.brokerController.getTopicConfigManager().getTopicConfigTable().get(topic)))
{
LOGGER.info("Broker receive request to update or create
topic={}, but topicConfig has no changes , so idempotent, caller address={}",
diff --git
a/broker/src/test/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessorTest.java
b/broker/src/test/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessorTest.java
index d87f513355..48ddb89172 100644
---
a/broker/src/test/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessorTest.java
+++
b/broker/src/test/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessorTest.java
@@ -48,6 +48,7 @@ import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.TopicFilterType;
import org.apache.rocketmq.common.TopicQueueId;
import org.apache.rocketmq.common.action.Action;
+import org.apache.rocketmq.common.attribute.AttributeParser;
import org.apache.rocketmq.common.constant.FIleReadaheadMode;
import org.apache.rocketmq.common.constant.PermName;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
@@ -330,6 +331,19 @@ public class AdminBrokerProcessorTest {
request = buildCreateTopicRequest(topic);
response = adminBrokerProcessor.processRequest(handlerContext,
request);
assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
+
+ // test deny MIXED topic type
+ brokerController.getBrokerConfig().setEnableMixedMessageType(false);
+ topic = "TEST_MIXED_TYPE";
+ Map<String, String> attributes = new HashMap<>();
+ attributes.put("+message.type", "MIXED");
+ request = buildCreateTopicRequest(topic, attributes);
+ response = adminBrokerProcessor.processRequest(handlerContext,
request);
+ assertThat(response.getCode()).isEqualTo(ResponseCode.SYSTEM_ERROR);
+ // test allow MIXED topic type
+ brokerController.getBrokerConfig().setEnableMixedMessageType(true);
+ response = adminBrokerProcessor.processRequest(handlerContext,
request);
+ assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
}
@Test
@@ -355,6 +369,20 @@ public class AdminBrokerProcessorTest {
//test no changes
response = adminBrokerProcessor.processRequest(handlerContext,
request);
assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
+
+ // test deny MIXED topic type
+ brokerController.getBrokerConfig().setEnableMixedMessageType(false);
+ topicList.add("TEST_MIXED_TYPE");
+ topicList.add("TEST_MIXED_TYPE1");
+ Map<String, String> attributes = new HashMap<>();
+ attributes.put("+message.type", "MIXED");
+ request = buildCreateTopicListRequest(topicList, attributes);
+ response = adminBrokerProcessor.processRequest(handlerContext,
request);
+ assertThat(response.getCode()).isEqualTo(ResponseCode.SYSTEM_ERROR);
+ // test allow MIXED topic type
+ brokerController.getBrokerConfig().setEnableMixedMessageType(true);
+ response = adminBrokerProcessor.processRequest(handlerContext,
request);
+ assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
}
@Test
@@ -1312,18 +1340,29 @@ public class AdminBrokerProcessorTest {
}
private RemotingCommand buildCreateTopicRequest(String topic) {
+ return buildCreateTopicRequest(topic, null);
+ }
+
+ private RemotingCommand buildCreateTopicRequest(String topic, Map<String,
String> attributes) {
CreateTopicRequestHeader requestHeader = new
CreateTopicRequestHeader();
requestHeader.setTopic(topic);
requestHeader.setTopicFilterType(TopicFilterType.SINGLE_TAG.name());
requestHeader.setReadQueueNums(8);
requestHeader.setWriteQueueNums(8);
requestHeader.setPerm(PermName.PERM_READ | PermName.PERM_WRITE);
+ if (attributes != null) {
+
requestHeader.setAttributes(AttributeParser.parseToString(attributes));
+ }
RemotingCommand request =
RemotingCommand.createRequestCommand(RequestCode.UPDATE_AND_CREATE_TOPIC,
requestHeader);
request.makeCustomHeaderToNet();
return request;
}
private RemotingCommand buildCreateTopicListRequest(List<String>
topicList) {
+ return buildCreateTopicListRequest(topicList, null);
+ }
+
+ private RemotingCommand buildCreateTopicListRequest(List<String>
topicList, Map<String, String> attributes) {
List<TopicConfig> topicConfigList = new ArrayList<>();
for (String topic:topicList) {
TopicConfig topicConfig = new TopicConfig(topic);
@@ -1333,6 +1372,9 @@ public class AdminBrokerProcessorTest {
topicConfig.setPerm(PermName.PERM_READ | PermName.PERM_WRITE);
topicConfig.setTopicSysFlag(0);
topicConfig.setOrder(false);
+ if (attributes != null) {
+ topicConfig.setAttributes(new HashMap<>(attributes));
+ }
topicConfigList.add(topicConfig);
}
RemotingCommand request =
RemotingCommand.createRequestCommand(RequestCode.UPDATE_AND_CREATE_TOPIC_LIST,
null);