This is an automated email from the ASF dual-hosted git repository.

jinrongtong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-dashboard.git


The following commit(s) were added to refs/heads/master by this push:
     new 823bce2   feat: add topic message type
823bce2 is described below

commit 823bce2b8b770f15ea63674d5e6d66a8d43479d9
Author: guangdashao <[email protected]>
AuthorDate: Tue Jun 4 11:40:46 2024 +0800

     feat: add topic message type
    
    add message type
---
 .../dashboard/model/request/TopicConfigInfo.java   |  18 ++-
 .../dashboard/service/impl/ClusterServiceImpl.java |   6 +
 .../dashboard/service/impl/TopicServiceImpl.java   | 151 +++++++++++++++------
 src/main/resources/application.yml                 |   8 +-
 src/main/resources/static/src/i18n/en.js           |   8 +-
 src/main/resources/static/src/i18n/zh.js           |   8 +-
 src/main/resources/static/src/topic.js             |   5 +-
 src/main/resources/static/view/pages/topic.html    |  13 ++
 8 files changed, 168 insertions(+), 49 deletions(-)

diff --git 
a/src/main/java/org/apache/rocketmq/dashboard/model/request/TopicConfigInfo.java
 
b/src/main/java/org/apache/rocketmq/dashboard/model/request/TopicConfigInfo.java
index 2c633cd..6b9eb67 100644
--- 
a/src/main/java/org/apache/rocketmq/dashboard/model/request/TopicConfigInfo.java
+++ 
b/src/main/java/org/apache/rocketmq/dashboard/model/request/TopicConfigInfo.java
@@ -31,6 +31,7 @@ public class TopicConfigInfo {
     private int perm;
     private boolean order;
 
+    private String messageType;
     public List<String> getClusterNameList() {
         return clusterNameList;
     }
@@ -91,6 +92,18 @@ public class TopicConfigInfo {
         this.order = order;
     }
 
+
+    public String getMessageType() {
+        return messageType;
+    }
+
+    public void setMessageType(String messageType) {
+        this.messageType = messageType;
+    }
+
+
+
+
     @Override
     public boolean equals(Object o) {
         if (this == o)
@@ -102,12 +115,13 @@ public class TopicConfigInfo {
             readQueueNums == that.readQueueNums &&
             perm == that.perm &&
             order == that.order &&
-            Objects.equal(topicName, that.topicName);
+            Objects.equal(topicName, that.topicName) &&
+            Objects.equal(messageType, that.messageType);
     }
 
     @Override
     public int hashCode() {
-        return Objects.hashCode(topicName, writeQueueNums, readQueueNums, 
perm, order);
+        return Objects.hashCode(topicName, writeQueueNums, readQueueNums, 
perm, order,messageType);
     }
 
 }
diff --git 
a/src/main/java/org/apache/rocketmq/dashboard/service/impl/ClusterServiceImpl.java
 
b/src/main/java/org/apache/rocketmq/dashboard/service/impl/ClusterServiceImpl.java
index facf448..12e7f71 100644
--- 
a/src/main/java/org/apache/rocketmq/dashboard/service/impl/ClusterServiceImpl.java
+++ 
b/src/main/java/org/apache/rocketmq/dashboard/service/impl/ClusterServiceImpl.java
@@ -17,6 +17,7 @@
 
 package org.apache.rocketmq.dashboard.service.impl;
 
+import org.apache.rocketmq.common.attribute.TopicMessageType;
 import org.apache.rocketmq.remoting.protocol.body.ClusterInfo;
 import org.apache.rocketmq.remoting.protocol.body.KVTable;
 import org.apache.rocketmq.remoting.protocol.route.BrokerData;
@@ -30,8 +31,10 @@ import org.slf4j.LoggerFactory;
 import org.springframework.stereotype.Service;
 
 import javax.annotation.Resource;
+import java.util.Arrays;
 import java.util.Map;
 import java.util.Properties;
+import java.util.stream.Collectors;
 
 @Service
 public class ClusterServiceImpl implements ClusterService {
@@ -56,6 +59,9 @@ public class ClusterServiceImpl implements ClusterService {
             }
             resultMap.put("clusterInfo", clusterInfo);
             resultMap.put("brokerServer", brokerServer);
+            // add messageType
+            resultMap.put("messageTypes", 
Arrays.stream(TopicMessageType.values()).sorted()
+                    .collect(Collectors.toMap(TopicMessageType::getValue, 
messageType ->String.format("MESSAGE_TYPE_%s",messageType.getValue()))));
             return resultMap;
         }
         catch (Exception err) {
diff --git 
a/src/main/java/org/apache/rocketmq/dashboard/service/impl/TopicServiceImpl.java
 
b/src/main/java/org/apache/rocketmq/dashboard/service/impl/TopicServiceImpl.java
index 1d7d571..ecd08de 100644
--- 
a/src/main/java/org/apache/rocketmq/dashboard/service/impl/TopicServiceImpl.java
+++ 
b/src/main/java/org/apache/rocketmq/dashboard/service/impl/TopicServiceImpl.java
@@ -18,25 +18,24 @@
 package org.apache.rocketmq.dashboard.service.impl;
 
 import com.google.common.base.Throwables;
+import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
-import java.util.stream.Collectors;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.rocketmq.acl.common.AclClientRPCHook;
 import org.apache.rocketmq.acl.common.SessionCredentials;
 import org.apache.rocketmq.client.producer.DefaultMQProducer;
 import org.apache.rocketmq.client.producer.SendResult;
+import org.apache.rocketmq.client.producer.TransactionListener;
+import org.apache.rocketmq.client.producer.TransactionMQProducer;
+import org.apache.rocketmq.client.producer.LocalTransactionState;
 import org.apache.rocketmq.client.trace.TraceContext;
 import org.apache.rocketmq.client.trace.TraceDispatcher;
 import org.apache.rocketmq.common.MixAll;
 import org.apache.rocketmq.common.TopicConfig;
-import org.apache.rocketmq.remoting.protocol.admin.TopicStatsTable;
+import org.apache.rocketmq.common.attribute.TopicMessageType;
 import org.apache.rocketmq.common.message.Message;
-import org.apache.rocketmq.remoting.protocol.body.ClusterInfo;
-import org.apache.rocketmq.remoting.protocol.body.GroupList;
-import org.apache.rocketmq.remoting.protocol.body.TopicList;
-import org.apache.rocketmq.remoting.protocol.route.BrokerData;
-import org.apache.rocketmq.remoting.protocol.route.TopicRouteData;
+import org.apache.rocketmq.common.message.MessageExt;
 import org.apache.rocketmq.common.topic.TopicValidator;
 import org.apache.rocketmq.dashboard.config.RMQConfigure;
 import org.apache.rocketmq.dashboard.model.request.SendTopicMessageRequest;
@@ -44,6 +43,12 @@ import 
org.apache.rocketmq.dashboard.model.request.TopicConfigInfo;
 import org.apache.rocketmq.dashboard.service.AbstractCommonService;
 import org.apache.rocketmq.dashboard.service.TopicService;
 import org.apache.rocketmq.remoting.RPCHook;
+import org.apache.rocketmq.remoting.protocol.admin.TopicStatsTable;
+import org.apache.rocketmq.remoting.protocol.body.ClusterInfo;
+import org.apache.rocketmq.remoting.protocol.body.GroupList;
+import org.apache.rocketmq.remoting.protocol.body.TopicList;
+import org.apache.rocketmq.remoting.protocol.route.BrokerData;
+import org.apache.rocketmq.remoting.protocol.route.TopicRouteData;
 import org.apache.rocketmq.tools.command.CommandUtil;
 import org.joor.Reflect;
 import org.springframework.beans.BeanUtils;
@@ -55,6 +60,11 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+
+import static 
org.apache.rocketmq.common.TopicAttributes.TOPIC_MESSAGE_TYPE_ATTRIBUTE;
 
 @Service
 public class TopicServiceImpl extends AbstractCommonService implements 
TopicService {
@@ -68,18 +78,18 @@ public class TopicServiceImpl extends AbstractCommonService 
implements TopicServ
             TopicList allTopics = mqAdminExt.fetchAllTopicList();
             TopicList sysTopics = getSystemTopicList();
             Set<String> topics =
-                allTopics.getTopicList().stream().map(topic -> {
-                    if (!skipSysProcess && 
sysTopics.getTopicList().contains(topic)) {
-                        topic = String.format("%s%s", "%SYS%", topic);
-                    }
-                    return topic;
-                }).filter(topic -> {
-                    if (skipRetryAndDlq) {
-                        return 
!(topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)
-                            || 
topic.startsWith(MixAll.DLQ_GROUP_TOPIC_PREFIX));
-                    }
-                    return true;
-                }).collect(Collectors.toSet());
+                    allTopics.getTopicList().stream().map(topic -> {
+                        if (!skipSysProcess && 
sysTopics.getTopicList().contains(topic)) {
+                            topic = String.format("%s%s", "%SYS%", topic);
+                        }
+                        return topic;
+                    }).filter(topic -> {
+                        if (skipRetryAndDlq) {
+                            return 
!(topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)
+                                    || 
topic.startsWith(MixAll.DLQ_GROUP_TOPIC_PREFIX));
+                        }
+                        return true;
+                    }).collect(Collectors.toSet());
             allTopics.getTopicList().clear();
             allTopics.getTopicList().addAll(topics);
             return allTopics;
@@ -123,10 +133,15 @@ public class TopicServiceImpl extends 
AbstractCommonService implements TopicServ
     public void createOrUpdate(TopicConfigInfo topicCreateOrUpdateRequest) {
         TopicConfig topicConfig = new TopicConfig();
         BeanUtils.copyProperties(topicCreateOrUpdateRequest, topicConfig);
+        String messageType = topicCreateOrUpdateRequest.getMessageType();
+        if (StringUtils.isBlank(messageType)) {
+            messageType = TopicMessageType.NORMAL.name();
+        }
+        
topicConfig.setAttributes(ImmutableMap.of("+".concat(TOPIC_MESSAGE_TYPE_ATTRIBUTE.getName()),
 messageType));
         try {
             ClusterInfo clusterInfo = mqAdminExt.examineBrokerClusterInfo();
             for (String brokerName : 
changeToBrokerNameSet(clusterInfo.getClusterAddrTable(),
-                topicCreateOrUpdateRequest.getClusterNameList(), 
topicCreateOrUpdateRequest.getBrokerNameList())) {
+                    topicCreateOrUpdateRequest.getClusterNameList(), 
topicCreateOrUpdateRequest.getBrokerNameList())) {
                 
mqAdminExt.createAndUpdateTopicConfig(clusterInfo.getBrokerAddrTable().get(brokerName).selectBrokerAddr(),
 topicConfig);
             }
         } catch (Exception err) {
@@ -156,6 +171,11 @@ public class TopicServiceImpl extends 
AbstractCommonService implements TopicServ
             TopicConfig topicConfig = examineTopicConfig(topic, 
brokerData.getBrokerName());
             BeanUtils.copyProperties(topicConfig, topicConfigInfo);
             
topicConfigInfo.setBrokerNameList(Lists.newArrayList(brokerData.getBrokerName()));
+            String messageType = 
topicConfig.getAttributes().get(TOPIC_MESSAGE_TYPE_ATTRIBUTE.getName());
+            if (StringUtils.isBlank(messageType)) {
+                messageType = TopicMessageType.UNSPECIFIED.name();
+            }
+            topicConfigInfo.setMessageType(messageType);
             topicConfigInfoList.add(topicConfigInfo);
         }
         return topicConfigInfoList;
@@ -226,6 +246,12 @@ public class TopicServiceImpl extends 
AbstractCommonService implements TopicServ
         return defaultMQProducer;
     }
 
+    public TransactionMQProducer buildTransactionMQProducer(String 
producerGroup, RPCHook rpcHook, boolean traceEnabled) {
+        TransactionMQProducer defaultMQProducer = new 
TransactionMQProducer(null, producerGroup, rpcHook, traceEnabled, 
TopicValidator.RMQ_SYS_TRACE_TOPIC);
+        defaultMQProducer.setUseTLS(configure.isUseTLS());
+        return defaultMQProducer;
+    }
+
     private TopicList getSystemTopicList() {
         RPCHook rpcHook = null;
         boolean isEnableAcl = !StringUtils.isEmpty(configure.getAccessKey()) 
&& !StringUtils.isEmpty(configure.getSecretKey());
@@ -249,32 +275,61 @@ public class TopicServiceImpl extends 
AbstractCommonService implements TopicServ
 
     @Override
     public SendResult sendTopicMessageRequest(SendTopicMessageRequest 
sendTopicMessageRequest) {
-        DefaultMQProducer producer = null;
+        List<TopicConfigInfo> topicConfigInfos = 
examineTopicConfig(sendTopicMessageRequest.getTopic());
+        String messageType = topicConfigInfos.get(0).getMessageType();
         AclClientRPCHook rpcHook = null;
         if (configure.isACLEnabled()) {
             rpcHook = new AclClientRPCHook(new SessionCredentials(
-                configure.getAccessKey(),
-                configure.getSecretKey()
+                    configure.getAccessKey(),
+                    configure.getSecretKey()
             ));
         }
-        producer = buildDefaultMQProducer(MixAll.SELF_TEST_PRODUCER_GROUP, 
rpcHook, sendTopicMessageRequest.isTraceEnabled());
-        producer.setInstanceName(String.valueOf(System.currentTimeMillis()));
-        producer.setNamesrvAddr(configure.getNamesrvAddr());
-        try {
-            producer.start();
-            Message msg = new Message(sendTopicMessageRequest.getTopic(),
-                sendTopicMessageRequest.getTag(),
-                sendTopicMessageRequest.getKey(),
-                sendTopicMessageRequest.getMessageBody().getBytes()
-            );
-            return producer.send(msg);
-        } catch (Exception e) {
-            Throwables.throwIfUnchecked(e);
-            throw new RuntimeException(e);
-        } finally {
-            waitSendTraceFinish(producer, 
sendTopicMessageRequest.isTraceEnabled());
-            producer.shutdown();
+        if (TopicMessageType.TRANSACTION.getValue().equals(messageType)) {
+            // transaction message
+            TransactionListener transactionListener = new 
TransactionListenerImpl();
+
+            TransactionMQProducer producer = 
buildTransactionMQProducer(MixAll.SELF_TEST_PRODUCER_GROUP, rpcHook, 
sendTopicMessageRequest.isTraceEnabled());
+            
producer.setInstanceName(String.valueOf(System.currentTimeMillis()));
+            producer.setNamesrvAddr(configure.getNamesrvAddr());
+            producer.setTransactionListener(transactionListener);
+            try {
+                producer.start();
+                Message msg = new Message(sendTopicMessageRequest.getTopic(),
+                        sendTopicMessageRequest.getTag(),
+                        sendTopicMessageRequest.getKey(),
+                        sendTopicMessageRequest.getMessageBody().getBytes()
+                );
+                return producer.sendMessageInTransaction(msg, null);
+            } catch (Exception e) {
+                Throwables.throwIfUnchecked(e);
+                throw new RuntimeException(e);
+            } finally {
+                waitSendTraceFinish(producer, 
sendTopicMessageRequest.isTraceEnabled());
+                producer.shutdown();
+            }
+        } else {
+            // no transaction message
+            DefaultMQProducer producer = null;
+            producer = buildDefaultMQProducer(MixAll.SELF_TEST_PRODUCER_GROUP, 
rpcHook, sendTopicMessageRequest.isTraceEnabled());
+            
producer.setInstanceName(String.valueOf(System.currentTimeMillis()));
+            producer.setNamesrvAddr(configure.getNamesrvAddr());
+            try {
+                producer.start();
+                Message msg = new Message(sendTopicMessageRequest.getTopic(),
+                        sendTopicMessageRequest.getTag(),
+                        sendTopicMessageRequest.getKey(),
+                        sendTopicMessageRequest.getMessageBody().getBytes()
+                );
+                return producer.send(msg);
+            } catch (Exception e) {
+                Throwables.throwIfUnchecked(e);
+                throw new RuntimeException(e);
+            } finally {
+                waitSendTraceFinish(producer, 
sendTopicMessageRequest.isTraceEnabled());
+                producer.shutdown();
+            }
         }
+
     }
 
     private void waitSendTraceFinish(DefaultMQProducer producer, boolean 
traceEnabled) {
@@ -296,4 +351,20 @@ public class TopicServiceImpl extends 
AbstractCommonService implements TopicServ
         } catch (Exception ignore) {
         }
     }
+
+    static class TransactionListenerImpl implements TransactionListener {
+        private AtomicInteger transactionIndex = new AtomicInteger(0);
+
+        private ConcurrentHashMap<String, Integer> localTrans = new 
ConcurrentHashMap<>();
+
+        @Override
+        public LocalTransactionState executeLocalTransaction(Message msg, 
Object arg) {
+            return LocalTransactionState.COMMIT_MESSAGE;
+        }
+
+        @Override
+        public LocalTransactionState checkLocalTransaction(MessageExt msg) {
+            return LocalTransactionState.COMMIT_MESSAGE;
+        }
+    }
 }
diff --git a/src/main/resources/application.yml 
b/src/main/resources/application.yml
index 0ab405e..090e421 100644
--- a/src/main/resources/application.yml
+++ b/src/main/resources/application.yml
@@ -42,7 +42,9 @@ rocketmq:
     # configure multiple namesrv addresses to manage multiple different 
clusters
     namesrvAddrs:
       - 127.0.0.1:9876
-      - 127.0.0.2:9876
+    #      - 127.0.0.2:9876
+    #      - 10.151.47.32:9876;10.151.47.33:9876;10.151.47.34:9876
+    #      - 10.151.47.30:9876
     # if you use rocketmq version < 3.5.8, rocketmq.config.isVIPChannel should 
be false.default true
     isVIPChannel:
     # timeout for mqadminExt, default 5000ms
@@ -58,8 +60,8 @@ rocketmq:
     loginRequired: false
     useTLS: false
     # set the accessKey and secretKey if you used acl
-    accessKey: # if version > 4.4.0
-    secretKey: # if version > 4.4.0
+#    accessKey: rocketmq2
+#    secretKey: 12345678
 
 threadpool:
   config:
diff --git a/src/main/resources/static/src/i18n/en.js 
b/src/main/resources/static/src/i18n/en.js
index f9a4e3c..943ce48 100644
--- a/src/main/resources/static/src/i18n/en.js
+++ b/src/main/resources/static/src/i18n/en.js
@@ -123,5 +123,11 @@ var en = {
     "GROUP_PERM":"Group Permission",
     "SYNCHRONIZE":"Synchronize Data",
     "SHOW":"Show",
-    "HIDE":"Hide"
+    "HIDE":"Hide",
+    "MESSAGE_TYPE":"messageType",
+    "MESSAGE_TYPE_UNSPECIFIED": "UNSPECIFIED, is NORMAL",
+    "MESSAGE_TYPE_NORMAL": "NORMAL",
+    "MESSAGE_TYPE_FIFO": "FIFO",
+    "MESSAGE_TYPE_DELAY": "DELAY",
+    "MESSAGE_TYPE_TRANSACTION": "TRANSACTION",
 }
diff --git a/src/main/resources/static/src/i18n/zh.js 
b/src/main/resources/static/src/i18n/zh.js
index b6fa589..8a3b3ff 100644
--- a/src/main/resources/static/src/i18n/zh.js
+++ b/src/main/resources/static/src/i18n/zh.js
@@ -124,5 +124,11 @@ var zh = {
     "GROUP_PERM":"消费组权限",
     "SYNCHRONIZE":"同步",
     "SHOW":"显示",
-    "HIDE":"隐藏"
+    "HIDE":"隐藏",
+    "MESSAGE_TYPE":"消息类型",
+    "MESSAGE_TYPE_UNSPECIFIED": "未指定,为普通消息",
+    "MESSAGE_TYPE_NORMAL": "普通消息",
+    "MESSAGE_TYPE_FIFO": "顺序消息",
+    "MESSAGE_TYPE_DELAY": "定时/延时消息",
+    "MESSAGE_TYPE_TRANSACTION": "事务消息",
 }
\ No newline at end of file
diff --git a/src/main/resources/static/src/topic.js 
b/src/main/resources/static/src/topic.js
index 998f219..bce0df8 100644
--- a/src/main/resources/static/src/topic.js
+++ b/src/main/resources/static/src/topic.js
@@ -328,8 +328,8 @@ module.controller('topicController', ['$scope', 'ngDialog', 
'$http', 'Notificati
         var bIsUpdate = true;
         if (request == null) {
             request = [{
-                writeQueueNums: 16,
-                readQueueNums: 16,
+                writeQueueNums: 8,
+                readQueueNums: 8,
                 perm: 6,
                 order: false,
                 topicName: "",
@@ -355,6 +355,7 @@ module.controller('topicController', ['$scope', 'ngDialog', 
'$http', 'Notificati
                         topicRequestList: request,
                         allClusterNameList: 
Object.keys(resp.data.clusterInfo.clusterAddrTable),
                         allBrokerNameList: Object.keys(resp.data.brokerServer),
+                        allMessageTypeList: resp.data.messageTypes,
                         bIsUpdate: bIsUpdate,
                         writeOperationEnabled: $scope.writeOperationEnabled
                     }
diff --git a/src/main/resources/static/view/pages/topic.html 
b/src/main/resources/static/view/pages/topic.html
index bea5ac7..7547c10 100644
--- a/src/main/resources/static/view/pages/topic.html
+++ b/src/main/resources/static/view/pages/topic.html
@@ -63,6 +63,7 @@
                             <button class="btn btn-raised btn-sm btn-primary" 
type="button"
                                     ng-click="openUpdateDialog(topic, 
sysFlag)">topic {{'CONFIG' |translate}}
                             </button>
+<!--                            todo 发送消息,根据消息类型判断-->
                             <button class="btn btn-raised btn-sm btn-primary" 
type="button"
                                     ng-show="{{!sysFlag}}"
                                     
ng-click="openSendTopicMessageDialog(topic)">{{'SEND_MSG' | translate}}
@@ -189,6 +190,18 @@
                     <span class="text-danger" 
ng-show="addAppForm.topicName.$error.required">{{'TOPIC_NAME'|translate}}不能为空.</span>
                 </div>
             </div>
+            <!--            设置topic 类型 -->
+            <div class="form-group">
+                <label class="control-label 
col-sm-2">{{'MESSAGE_TYPE'|translate}}:</label>
+                <div class="col-sm-10">
+                    <select name="mySelectMessageType" chosen 
ng-disabled="ngDialogData.bIsUpdate"
+                            ng-model="item.messageType"
+                            ng-options="messageType as value | translate  
disable when messageType=='UNSPECIFIED' for (messageType , value) in 
ngDialogData.allMessageTypeList"
+                    >
+                        <option value=""></option>
+                    </select>
+                </div>
+            </div>
             <div class="form-group">
                 <label class="control-label 
col-sm-2">{{'WRITE_QUEUE_NUMS'|translate}}:</label>
                 <div class="col-sm-10">

Reply via email to