This is an automated email from the ASF dual-hosted git repository.

kaili pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 3bdabf703b [ISSUE #7109] support the mixed topic type (#7110)
3bdabf703b is described below

commit 3bdabf703b883fea6181df8889c08d1e91202291
Author: ShuangxiDing <[email protected]>
AuthorDate: Thu Aug 3 16:10:24 2023 +0800

    [ISSUE #7109] support the mixed topic type (#7110)
    
    * Add mixed message type.
    
    * support mixed topic type for grpc server
    
    * remove the unnecessary parentheses around expression
    
    * format the javadoc
---
 .../common/attribute/TopicMessageType.java         |  5 +++--
 .../proxy/grpc/v2/route/RouteActivity.java         | 22 +++++++++++++---------
 .../DefaultTopicMessageTypeValidator.java          |  7 ++++---
 .../validator/TopicMessageTypeValidator.java       |  6 +++---
 4 files changed, 23 insertions(+), 17 deletions(-)

diff --git 
a/common/src/main/java/org/apache/rocketmq/common/attribute/TopicMessageType.java
 
b/common/src/main/java/org/apache/rocketmq/common/attribute/TopicMessageType.java
index 77629e4c90..9680acec74 100644
--- 
a/common/src/main/java/org/apache/rocketmq/common/attribute/TopicMessageType.java
+++ 
b/common/src/main/java/org/apache/rocketmq/common/attribute/TopicMessageType.java
@@ -27,7 +27,8 @@ public enum TopicMessageType {
     NORMAL("NORMAL"),
     FIFO("FIFO"),
     DELAY("DELAY"),
-    TRANSACTION("TRANSACTION");
+    TRANSACTION("TRANSACTION"),
+    MIXED("MIXED");
 
     private final String value;
     TopicMessageType(String value) {
@@ -35,7 +36,7 @@ public enum TopicMessageType {
     }
 
     public static Set<String> topicMessageTypeSet() {
-        return Sets.newHashSet(UNSPECIFIED.value, NORMAL.value, FIFO.value, 
DELAY.value, TRANSACTION.value);
+        return Sets.newHashSet(UNSPECIFIED.value, NORMAL.value, FIFO.value, 
DELAY.value, TRANSACTION.value, MIXED.value);
     }
 
     public String getValue() {
diff --git 
a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/route/RouteActivity.java
 
b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/route/RouteActivity.java
index c5d485691b..02dea0cda7 100644
--- 
a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/route/RouteActivity.java
+++ 
b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/route/RouteActivity.java
@@ -32,6 +32,8 @@ import apache.rocketmq.v2.QueryRouteResponse;
 import apache.rocketmq.v2.Resource;
 import com.google.common.net.HostAndPort;
 import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -259,7 +261,7 @@ public class RouteActivity extends AbstractMessingActivity {
             MessageQueue messageQueue = 
MessageQueue.newBuilder().setBroker(broker).setTopic(topic)
                 .setId(queueIdIndex++)
                 .setPermission(Permission.READ)
-                .addAcceptMessageTypes(parseTopicMessageType(topicMessageType))
+                
.addAllAcceptMessageTypes(parseTopicMessageType(topicMessageType))
                 .build();
             messageQueueList.add(messageQueue);
         }
@@ -268,7 +270,7 @@ public class RouteActivity extends AbstractMessingActivity {
             MessageQueue messageQueue = 
MessageQueue.newBuilder().setBroker(broker).setTopic(topic)
                 .setId(queueIdIndex++)
                 .setPermission(Permission.WRITE)
-                .addAcceptMessageTypes(parseTopicMessageType(topicMessageType))
+                
.addAllAcceptMessageTypes(parseTopicMessageType(topicMessageType))
                 .build();
             messageQueueList.add(messageQueue);
         }
@@ -277,7 +279,7 @@ public class RouteActivity extends AbstractMessingActivity {
             MessageQueue messageQueue = 
MessageQueue.newBuilder().setBroker(broker).setTopic(topic)
                 .setId(queueIdIndex++)
                 .setPermission(Permission.READ_WRITE)
-                .addAcceptMessageTypes(parseTopicMessageType(topicMessageType))
+                
.addAllAcceptMessageTypes(parseTopicMessageType(topicMessageType))
                 .build();
             messageQueueList.add(messageQueue);
         }
@@ -285,18 +287,20 @@ public class RouteActivity extends 
AbstractMessingActivity {
         return messageQueueList;
     }
 
-    private MessageType parseTopicMessageType(TopicMessageType 
topicMessageType) {
+    private List<MessageType> parseTopicMessageType(TopicMessageType 
topicMessageType) {
         switch (topicMessageType) {
             case NORMAL:
-                return MessageType.NORMAL;
+                return Collections.singletonList(MessageType.NORMAL);
             case FIFO:
-                return MessageType.FIFO;
+                return Collections.singletonList(MessageType.FIFO);
             case TRANSACTION:
-                return MessageType.TRANSACTION;
+                return Collections.singletonList(MessageType.TRANSACTION);
             case DELAY:
-                return MessageType.DELAY;
+                return Collections.singletonList(MessageType.DELAY);
+            case MIXED:
+                return Arrays.asList(MessageType.NORMAL, MessageType.FIFO, 
MessageType.DELAY, MessageType.TRANSACTION);
             default:
-                return MessageType.MESSAGE_TYPE_UNSPECIFIED;
+                return 
Collections.singletonList(MessageType.MESSAGE_TYPE_UNSPECIFIED);
         }
     }
 }
diff --git 
a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/validator/DefaultTopicMessageTypeValidator.java
 
b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/validator/DefaultTopicMessageTypeValidator.java
index bc2fcf30fb..83588f110c 100644
--- 
a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/validator/DefaultTopicMessageTypeValidator.java
+++ 
b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/validator/DefaultTopicMessageTypeValidator.java
@@ -23,9 +23,10 @@ import org.apache.rocketmq.proxy.common.ProxyExceptionCode;
 
 public class DefaultTopicMessageTypeValidator implements 
TopicMessageTypeValidator {
 
-    public void validate(TopicMessageType topicMessageType, TopicMessageType 
messageType) {
-        if (messageType.equals(TopicMessageType.UNSPECIFIED) || 
!messageType.equals(topicMessageType)) {
-            String errorInfo = String.format("TopicMessageType validate 
failed, topic type is %s, message type is %s", topicMessageType, messageType);
+    public void validate(TopicMessageType expectedType, TopicMessageType 
actualType) {
+        if (actualType.equals(TopicMessageType.UNSPECIFIED)
+                || !actualType.equals(expectedType) && 
!expectedType.equals(TopicMessageType.MIXED)) {
+            String errorInfo = String.format("TopicMessageType validate 
failed, the expected type is %s, but actual type is %s", expectedType, 
actualType);
             throw new 
ProxyException(ProxyExceptionCode.MESSAGE_PROPERTY_CONFLICT_WITH_TYPE, 
errorInfo);
         }
     }
diff --git 
a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/validator/TopicMessageTypeValidator.java
 
b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/validator/TopicMessageTypeValidator.java
index 137be90956..32758da502 100644
--- 
a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/validator/TopicMessageTypeValidator.java
+++ 
b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/validator/TopicMessageTypeValidator.java
@@ -23,8 +23,8 @@ public interface TopicMessageTypeValidator {
     /**
      * Will throw {@link org.apache.rocketmq.proxy.common.ProxyException} if 
validate failed.
      *
-     * @param topicMessageType Target topic
-     * @param messageType      Message's type
+     * @param expectedType Target topic
+     * @param actualType   Message's type
      */
-    void validate(TopicMessageType topicMessageType, TopicMessageType 
messageType);
+    void validate(TopicMessageType expectedType, TopicMessageType actualType);
 }

Reply via email to