This is an automated email from the ASF dual-hosted git repository.
kaili pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 3bdabf703b [ISSUE #7109] support the mixed topic type (#7110)
3bdabf703b is described below
commit 3bdabf703b883fea6181df8889c08d1e91202291
Author: ShuangxiDing <[email protected]>
AuthorDate: Thu Aug 3 16:10:24 2023 +0800
[ISSUE #7109] support the mixed topic type (#7110)
* Add mixed message type.
* support mixed topic type for grpc server
* remove the unnecessary parentheses around expression
* format the javadoc
---
.../common/attribute/TopicMessageType.java | 5 +++--
.../proxy/grpc/v2/route/RouteActivity.java | 22 +++++++++++++---------
.../DefaultTopicMessageTypeValidator.java | 7 ++++---
.../validator/TopicMessageTypeValidator.java | 6 +++---
4 files changed, 23 insertions(+), 17 deletions(-)
diff --git
a/common/src/main/java/org/apache/rocketmq/common/attribute/TopicMessageType.java
b/common/src/main/java/org/apache/rocketmq/common/attribute/TopicMessageType.java
index 77629e4c90..9680acec74 100644
---
a/common/src/main/java/org/apache/rocketmq/common/attribute/TopicMessageType.java
+++
b/common/src/main/java/org/apache/rocketmq/common/attribute/TopicMessageType.java
@@ -27,7 +27,8 @@ public enum TopicMessageType {
NORMAL("NORMAL"),
FIFO("FIFO"),
DELAY("DELAY"),
- TRANSACTION("TRANSACTION");
+ TRANSACTION("TRANSACTION"),
+ MIXED("MIXED");
private final String value;
TopicMessageType(String value) {
@@ -35,7 +36,7 @@ public enum TopicMessageType {
}
public static Set<String> topicMessageTypeSet() {
- return Sets.newHashSet(UNSPECIFIED.value, NORMAL.value, FIFO.value,
DELAY.value, TRANSACTION.value);
+ return Sets.newHashSet(UNSPECIFIED.value, NORMAL.value, FIFO.value,
DELAY.value, TRANSACTION.value, MIXED.value);
}
public String getValue() {
diff --git
a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/route/RouteActivity.java
b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/route/RouteActivity.java
index c5d485691b..02dea0cda7 100644
---
a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/route/RouteActivity.java
+++
b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/route/RouteActivity.java
@@ -32,6 +32,8 @@ import apache.rocketmq.v2.QueryRouteResponse;
import apache.rocketmq.v2.Resource;
import com.google.common.net.HostAndPort;
import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -259,7 +261,7 @@ public class RouteActivity extends AbstractMessingActivity {
MessageQueue messageQueue =
MessageQueue.newBuilder().setBroker(broker).setTopic(topic)
.setId(queueIdIndex++)
.setPermission(Permission.READ)
- .addAcceptMessageTypes(parseTopicMessageType(topicMessageType))
+
.addAllAcceptMessageTypes(parseTopicMessageType(topicMessageType))
.build();
messageQueueList.add(messageQueue);
}
@@ -268,7 +270,7 @@ public class RouteActivity extends AbstractMessingActivity {
MessageQueue messageQueue =
MessageQueue.newBuilder().setBroker(broker).setTopic(topic)
.setId(queueIdIndex++)
.setPermission(Permission.WRITE)
- .addAcceptMessageTypes(parseTopicMessageType(topicMessageType))
+
.addAllAcceptMessageTypes(parseTopicMessageType(topicMessageType))
.build();
messageQueueList.add(messageQueue);
}
@@ -277,7 +279,7 @@ public class RouteActivity extends AbstractMessingActivity {
MessageQueue messageQueue =
MessageQueue.newBuilder().setBroker(broker).setTopic(topic)
.setId(queueIdIndex++)
.setPermission(Permission.READ_WRITE)
- .addAcceptMessageTypes(parseTopicMessageType(topicMessageType))
+
.addAllAcceptMessageTypes(parseTopicMessageType(topicMessageType))
.build();
messageQueueList.add(messageQueue);
}
@@ -285,18 +287,20 @@ public class RouteActivity extends
AbstractMessingActivity {
return messageQueueList;
}
- private MessageType parseTopicMessageType(TopicMessageType
topicMessageType) {
+ private List<MessageType> parseTopicMessageType(TopicMessageType
topicMessageType) {
switch (topicMessageType) {
case NORMAL:
- return MessageType.NORMAL;
+ return Collections.singletonList(MessageType.NORMAL);
case FIFO:
- return MessageType.FIFO;
+ return Collections.singletonList(MessageType.FIFO);
case TRANSACTION:
- return MessageType.TRANSACTION;
+ return Collections.singletonList(MessageType.TRANSACTION);
case DELAY:
- return MessageType.DELAY;
+ return Collections.singletonList(MessageType.DELAY);
+ case MIXED:
+ return Arrays.asList(MessageType.NORMAL, MessageType.FIFO,
MessageType.DELAY, MessageType.TRANSACTION);
default:
- return MessageType.MESSAGE_TYPE_UNSPECIFIED;
+ return
Collections.singletonList(MessageType.MESSAGE_TYPE_UNSPECIFIED);
}
}
}
diff --git
a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/validator/DefaultTopicMessageTypeValidator.java
b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/validator/DefaultTopicMessageTypeValidator.java
index bc2fcf30fb..83588f110c 100644
---
a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/validator/DefaultTopicMessageTypeValidator.java
+++
b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/validator/DefaultTopicMessageTypeValidator.java
@@ -23,9 +23,10 @@ import org.apache.rocketmq.proxy.common.ProxyExceptionCode;
public class DefaultTopicMessageTypeValidator implements
TopicMessageTypeValidator {
- public void validate(TopicMessageType topicMessageType, TopicMessageType
messageType) {
- if (messageType.equals(TopicMessageType.UNSPECIFIED) ||
!messageType.equals(topicMessageType)) {
- String errorInfo = String.format("TopicMessageType validate
failed, topic type is %s, message type is %s", topicMessageType, messageType);
+ public void validate(TopicMessageType expectedType, TopicMessageType
actualType) {
+ if (actualType.equals(TopicMessageType.UNSPECIFIED)
+ || !actualType.equals(expectedType) &&
!expectedType.equals(TopicMessageType.MIXED)) {
+ String errorInfo = String.format("TopicMessageType validate
failed, the expected type is %s, but actual type is %s", expectedType,
actualType);
throw new
ProxyException(ProxyExceptionCode.MESSAGE_PROPERTY_CONFLICT_WITH_TYPE,
errorInfo);
}
}
diff --git
a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/validator/TopicMessageTypeValidator.java
b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/validator/TopicMessageTypeValidator.java
index 137be90956..32758da502 100644
---
a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/validator/TopicMessageTypeValidator.java
+++
b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/validator/TopicMessageTypeValidator.java
@@ -23,8 +23,8 @@ public interface TopicMessageTypeValidator {
/**
* Will throw {@link org.apache.rocketmq.proxy.common.ProxyException} if
validate failed.
*
- * @param topicMessageType Target topic
- * @param messageType Message's type
+ * @param expectedType Target topic
+ * @param actualType Message's type
*/
- void validate(TopicMessageType topicMessageType, TopicMessageType
messageType);
+ void validate(TopicMessageType expectedType, TopicMessageType actualType);
}