This is an automated email from the ASF dual-hosted git repository.
jinrongtong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-dashboard.git
The following commit(s) were added to refs/heads/master by this push:
new 823bce2 feat: add topic message type
823bce2 is described below
commit 823bce2b8b770f15ea63674d5e6d66a8d43479d9
Author: guangdashao <[email protected]>
AuthorDate: Tue Jun 4 11:40:46 2024 +0800
feat: add topic message type
add message type
---
.../dashboard/model/request/TopicConfigInfo.java | 18 ++-
.../dashboard/service/impl/ClusterServiceImpl.java | 6 +
.../dashboard/service/impl/TopicServiceImpl.java | 151 +++++++++++++++------
src/main/resources/application.yml | 8 +-
src/main/resources/static/src/i18n/en.js | 8 +-
src/main/resources/static/src/i18n/zh.js | 8 +-
src/main/resources/static/src/topic.js | 5 +-
src/main/resources/static/view/pages/topic.html | 13 ++
8 files changed, 168 insertions(+), 49 deletions(-)
diff --git
a/src/main/java/org/apache/rocketmq/dashboard/model/request/TopicConfigInfo.java
b/src/main/java/org/apache/rocketmq/dashboard/model/request/TopicConfigInfo.java
index 2c633cd..6b9eb67 100644
---
a/src/main/java/org/apache/rocketmq/dashboard/model/request/TopicConfigInfo.java
+++
b/src/main/java/org/apache/rocketmq/dashboard/model/request/TopicConfigInfo.java
@@ -31,6 +31,7 @@ public class TopicConfigInfo {
private int perm;
private boolean order;
+ private String messageType;
public List<String> getClusterNameList() {
return clusterNameList;
}
@@ -91,6 +92,18 @@ public class TopicConfigInfo {
this.order = order;
}
+
+ public String getMessageType() {
+ return messageType;
+ }
+
+ public void setMessageType(String messageType) {
+ this.messageType = messageType;
+ }
+
+
+
+
@Override
public boolean equals(Object o) {
if (this == o)
@@ -102,12 +115,13 @@ public class TopicConfigInfo {
readQueueNums == that.readQueueNums &&
perm == that.perm &&
order == that.order &&
- Objects.equal(topicName, that.topicName);
+ Objects.equal(topicName, that.topicName) &&
+ Objects.equal(messageType, that.messageType);
}
@Override
public int hashCode() {
- return Objects.hashCode(topicName, writeQueueNums, readQueueNums,
perm, order);
+ return Objects.hashCode(topicName, writeQueueNums, readQueueNums,
perm, order,messageType);
}
}
diff --git
a/src/main/java/org/apache/rocketmq/dashboard/service/impl/ClusterServiceImpl.java
b/src/main/java/org/apache/rocketmq/dashboard/service/impl/ClusterServiceImpl.java
index facf448..12e7f71 100644
---
a/src/main/java/org/apache/rocketmq/dashboard/service/impl/ClusterServiceImpl.java
+++
b/src/main/java/org/apache/rocketmq/dashboard/service/impl/ClusterServiceImpl.java
@@ -17,6 +17,7 @@
package org.apache.rocketmq.dashboard.service.impl;
+import org.apache.rocketmq.common.attribute.TopicMessageType;
import org.apache.rocketmq.remoting.protocol.body.ClusterInfo;
import org.apache.rocketmq.remoting.protocol.body.KVTable;
import org.apache.rocketmq.remoting.protocol.route.BrokerData;
@@ -30,8 +31,10 @@ import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
+import java.util.Arrays;
import java.util.Map;
import java.util.Properties;
+import java.util.stream.Collectors;
@Service
public class ClusterServiceImpl implements ClusterService {
@@ -56,6 +59,9 @@ public class ClusterServiceImpl implements ClusterService {
}
resultMap.put("clusterInfo", clusterInfo);
resultMap.put("brokerServer", brokerServer);
+ // add messageType
+ resultMap.put("messageTypes",
Arrays.stream(TopicMessageType.values()).sorted()
+ .collect(Collectors.toMap(TopicMessageType::getValue,
messageType ->String.format("MESSAGE_TYPE_%s",messageType.getValue()))));
return resultMap;
}
catch (Exception err) {
diff --git
a/src/main/java/org/apache/rocketmq/dashboard/service/impl/TopicServiceImpl.java
b/src/main/java/org/apache/rocketmq/dashboard/service/impl/TopicServiceImpl.java
index 1d7d571..ecd08de 100644
---
a/src/main/java/org/apache/rocketmq/dashboard/service/impl/TopicServiceImpl.java
+++
b/src/main/java/org/apache/rocketmq/dashboard/service/impl/TopicServiceImpl.java
@@ -18,25 +18,24 @@
package org.apache.rocketmq.dashboard.service.impl;
import com.google.common.base.Throwables;
+import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
-import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
+import org.apache.rocketmq.client.producer.TransactionListener;
+import org.apache.rocketmq.client.producer.TransactionMQProducer;
+import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.trace.TraceContext;
import org.apache.rocketmq.client.trace.TraceDispatcher;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.TopicConfig;
-import org.apache.rocketmq.remoting.protocol.admin.TopicStatsTable;
+import org.apache.rocketmq.common.attribute.TopicMessageType;
import org.apache.rocketmq.common.message.Message;
-import org.apache.rocketmq.remoting.protocol.body.ClusterInfo;
-import org.apache.rocketmq.remoting.protocol.body.GroupList;
-import org.apache.rocketmq.remoting.protocol.body.TopicList;
-import org.apache.rocketmq.remoting.protocol.route.BrokerData;
-import org.apache.rocketmq.remoting.protocol.route.TopicRouteData;
+import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.topic.TopicValidator;
import org.apache.rocketmq.dashboard.config.RMQConfigure;
import org.apache.rocketmq.dashboard.model.request.SendTopicMessageRequest;
@@ -44,6 +43,12 @@ import
org.apache.rocketmq.dashboard.model.request.TopicConfigInfo;
import org.apache.rocketmq.dashboard.service.AbstractCommonService;
import org.apache.rocketmq.dashboard.service.TopicService;
import org.apache.rocketmq.remoting.RPCHook;
+import org.apache.rocketmq.remoting.protocol.admin.TopicStatsTable;
+import org.apache.rocketmq.remoting.protocol.body.ClusterInfo;
+import org.apache.rocketmq.remoting.protocol.body.GroupList;
+import org.apache.rocketmq.remoting.protocol.body.TopicList;
+import org.apache.rocketmq.remoting.protocol.route.BrokerData;
+import org.apache.rocketmq.remoting.protocol.route.TopicRouteData;
import org.apache.rocketmq.tools.command.CommandUtil;
import org.joor.Reflect;
import org.springframework.beans.BeanUtils;
@@ -55,6 +60,11 @@ import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+
+import static
org.apache.rocketmq.common.TopicAttributes.TOPIC_MESSAGE_TYPE_ATTRIBUTE;
@Service
public class TopicServiceImpl extends AbstractCommonService implements
TopicService {
@@ -68,18 +78,18 @@ public class TopicServiceImpl extends AbstractCommonService
implements TopicServ
TopicList allTopics = mqAdminExt.fetchAllTopicList();
TopicList sysTopics = getSystemTopicList();
Set<String> topics =
- allTopics.getTopicList().stream().map(topic -> {
- if (!skipSysProcess &&
sysTopics.getTopicList().contains(topic)) {
- topic = String.format("%s%s", "%SYS%", topic);
- }
- return topic;
- }).filter(topic -> {
- if (skipRetryAndDlq) {
- return
!(topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)
- ||
topic.startsWith(MixAll.DLQ_GROUP_TOPIC_PREFIX));
- }
- return true;
- }).collect(Collectors.toSet());
+ allTopics.getTopicList().stream().map(topic -> {
+ if (!skipSysProcess &&
sysTopics.getTopicList().contains(topic)) {
+ topic = String.format("%s%s", "%SYS%", topic);
+ }
+ return topic;
+ }).filter(topic -> {
+ if (skipRetryAndDlq) {
+ return
!(topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)
+ ||
topic.startsWith(MixAll.DLQ_GROUP_TOPIC_PREFIX));
+ }
+ return true;
+ }).collect(Collectors.toSet());
allTopics.getTopicList().clear();
allTopics.getTopicList().addAll(topics);
return allTopics;
@@ -123,10 +133,15 @@ public class TopicServiceImpl extends
AbstractCommonService implements TopicServ
public void createOrUpdate(TopicConfigInfo topicCreateOrUpdateRequest) {
TopicConfig topicConfig = new TopicConfig();
BeanUtils.copyProperties(topicCreateOrUpdateRequest, topicConfig);
+ String messageType = topicCreateOrUpdateRequest.getMessageType();
+ if (StringUtils.isBlank(messageType)) {
+ messageType = TopicMessageType.NORMAL.name();
+ }
+
topicConfig.setAttributes(ImmutableMap.of("+".concat(TOPIC_MESSAGE_TYPE_ATTRIBUTE.getName()),
messageType));
try {
ClusterInfo clusterInfo = mqAdminExt.examineBrokerClusterInfo();
for (String brokerName :
changeToBrokerNameSet(clusterInfo.getClusterAddrTable(),
- topicCreateOrUpdateRequest.getClusterNameList(),
topicCreateOrUpdateRequest.getBrokerNameList())) {
+ topicCreateOrUpdateRequest.getClusterNameList(),
topicCreateOrUpdateRequest.getBrokerNameList())) {
mqAdminExt.createAndUpdateTopicConfig(clusterInfo.getBrokerAddrTable().get(brokerName).selectBrokerAddr(),
topicConfig);
}
} catch (Exception err) {
@@ -156,6 +171,11 @@ public class TopicServiceImpl extends
AbstractCommonService implements TopicServ
TopicConfig topicConfig = examineTopicConfig(topic,
brokerData.getBrokerName());
BeanUtils.copyProperties(topicConfig, topicConfigInfo);
topicConfigInfo.setBrokerNameList(Lists.newArrayList(brokerData.getBrokerName()));
+ String messageType =
topicConfig.getAttributes().get(TOPIC_MESSAGE_TYPE_ATTRIBUTE.getName());
+ if (StringUtils.isBlank(messageType)) {
+ messageType = TopicMessageType.UNSPECIFIED.name();
+ }
+ topicConfigInfo.setMessageType(messageType);
topicConfigInfoList.add(topicConfigInfo);
}
return topicConfigInfoList;
@@ -226,6 +246,12 @@ public class TopicServiceImpl extends
AbstractCommonService implements TopicServ
return defaultMQProducer;
}
+ public TransactionMQProducer buildTransactionMQProducer(String
producerGroup, RPCHook rpcHook, boolean traceEnabled) {
+ TransactionMQProducer defaultMQProducer = new
TransactionMQProducer(null, producerGroup, rpcHook, traceEnabled,
TopicValidator.RMQ_SYS_TRACE_TOPIC);
+ defaultMQProducer.setUseTLS(configure.isUseTLS());
+ return defaultMQProducer;
+ }
+
private TopicList getSystemTopicList() {
RPCHook rpcHook = null;
boolean isEnableAcl = !StringUtils.isEmpty(configure.getAccessKey())
&& !StringUtils.isEmpty(configure.getSecretKey());
@@ -249,32 +275,61 @@ public class TopicServiceImpl extends
AbstractCommonService implements TopicServ
@Override
public SendResult sendTopicMessageRequest(SendTopicMessageRequest
sendTopicMessageRequest) {
- DefaultMQProducer producer = null;
+ List<TopicConfigInfo> topicConfigInfos =
examineTopicConfig(sendTopicMessageRequest.getTopic());
+ String messageType = topicConfigInfos.get(0).getMessageType();
AclClientRPCHook rpcHook = null;
if (configure.isACLEnabled()) {
rpcHook = new AclClientRPCHook(new SessionCredentials(
- configure.getAccessKey(),
- configure.getSecretKey()
+ configure.getAccessKey(),
+ configure.getSecretKey()
));
}
- producer = buildDefaultMQProducer(MixAll.SELF_TEST_PRODUCER_GROUP,
rpcHook, sendTopicMessageRequest.isTraceEnabled());
- producer.setInstanceName(String.valueOf(System.currentTimeMillis()));
- producer.setNamesrvAddr(configure.getNamesrvAddr());
- try {
- producer.start();
- Message msg = new Message(sendTopicMessageRequest.getTopic(),
- sendTopicMessageRequest.getTag(),
- sendTopicMessageRequest.getKey(),
- sendTopicMessageRequest.getMessageBody().getBytes()
- );
- return producer.send(msg);
- } catch (Exception e) {
- Throwables.throwIfUnchecked(e);
- throw new RuntimeException(e);
- } finally {
- waitSendTraceFinish(producer,
sendTopicMessageRequest.isTraceEnabled());
- producer.shutdown();
+ if (TopicMessageType.TRANSACTION.getValue().equals(messageType)) {
+ // transaction message
+ TransactionListener transactionListener = new
TransactionListenerImpl();
+
+ TransactionMQProducer producer =
buildTransactionMQProducer(MixAll.SELF_TEST_PRODUCER_GROUP, rpcHook,
sendTopicMessageRequest.isTraceEnabled());
+
producer.setInstanceName(String.valueOf(System.currentTimeMillis()));
+ producer.setNamesrvAddr(configure.getNamesrvAddr());
+ producer.setTransactionListener(transactionListener);
+ try {
+ producer.start();
+ Message msg = new Message(sendTopicMessageRequest.getTopic(),
+ sendTopicMessageRequest.getTag(),
+ sendTopicMessageRequest.getKey(),
+ sendTopicMessageRequest.getMessageBody().getBytes()
+ );
+ return producer.sendMessageInTransaction(msg, null);
+ } catch (Exception e) {
+ Throwables.throwIfUnchecked(e);
+ throw new RuntimeException(e);
+ } finally {
+ waitSendTraceFinish(producer,
sendTopicMessageRequest.isTraceEnabled());
+ producer.shutdown();
+ }
+ } else {
+ // no transaction message
+ DefaultMQProducer producer = null;
+ producer = buildDefaultMQProducer(MixAll.SELF_TEST_PRODUCER_GROUP,
rpcHook, sendTopicMessageRequest.isTraceEnabled());
+
producer.setInstanceName(String.valueOf(System.currentTimeMillis()));
+ producer.setNamesrvAddr(configure.getNamesrvAddr());
+ try {
+ producer.start();
+ Message msg = new Message(sendTopicMessageRequest.getTopic(),
+ sendTopicMessageRequest.getTag(),
+ sendTopicMessageRequest.getKey(),
+ sendTopicMessageRequest.getMessageBody().getBytes()
+ );
+ return producer.send(msg);
+ } catch (Exception e) {
+ Throwables.throwIfUnchecked(e);
+ throw new RuntimeException(e);
+ } finally {
+ waitSendTraceFinish(producer,
sendTopicMessageRequest.isTraceEnabled());
+ producer.shutdown();
+ }
}
+
}
private void waitSendTraceFinish(DefaultMQProducer producer, boolean
traceEnabled) {
@@ -296,4 +351,20 @@ public class TopicServiceImpl extends
AbstractCommonService implements TopicServ
} catch (Exception ignore) {
}
}
+
+ static class TransactionListenerImpl implements TransactionListener {
+ private AtomicInteger transactionIndex = new AtomicInteger(0);
+
+ private ConcurrentHashMap<String, Integer> localTrans = new
ConcurrentHashMap<>();
+
+ @Override
+ public LocalTransactionState executeLocalTransaction(Message msg,
Object arg) {
+ return LocalTransactionState.COMMIT_MESSAGE;
+ }
+
+ @Override
+ public LocalTransactionState checkLocalTransaction(MessageExt msg) {
+ return LocalTransactionState.COMMIT_MESSAGE;
+ }
+ }
}
diff --git a/src/main/resources/application.yml
b/src/main/resources/application.yml
index 0ab405e..090e421 100644
--- a/src/main/resources/application.yml
+++ b/src/main/resources/application.yml
@@ -42,7 +42,9 @@ rocketmq:
# configure multiple namesrv addresses to manage multiple different
clusters
namesrvAddrs:
- 127.0.0.1:9876
- - 127.0.0.2:9876
+ # - 127.0.0.2:9876
+ # - 10.151.47.32:9876;10.151.47.33:9876;10.151.47.34:9876
+ # - 10.151.47.30:9876
# if you use rocketmq version < 3.5.8, rocketmq.config.isVIPChannel should
be false.default true
isVIPChannel:
# timeout for mqadminExt, default 5000ms
@@ -58,8 +60,8 @@ rocketmq:
loginRequired: false
useTLS: false
# set the accessKey and secretKey if you used acl
- accessKey: # if version > 4.4.0
- secretKey: # if version > 4.4.0
+# accessKey: rocketmq2
+# secretKey: 12345678
threadpool:
config:
diff --git a/src/main/resources/static/src/i18n/en.js
b/src/main/resources/static/src/i18n/en.js
index f9a4e3c..943ce48 100644
--- a/src/main/resources/static/src/i18n/en.js
+++ b/src/main/resources/static/src/i18n/en.js
@@ -123,5 +123,11 @@ var en = {
"GROUP_PERM":"Group Permission",
"SYNCHRONIZE":"Synchronize Data",
"SHOW":"Show",
- "HIDE":"Hide"
+ "HIDE":"Hide",
+ "MESSAGE_TYPE":"messageType",
+ "MESSAGE_TYPE_UNSPECIFIED": "UNSPECIFIED, is NORMAL",
+ "MESSAGE_TYPE_NORMAL": "NORMAL",
+ "MESSAGE_TYPE_FIFO": "FIFO",
+ "MESSAGE_TYPE_DELAY": "DELAY",
+ "MESSAGE_TYPE_TRANSACTION": "TRANSACTION",
}
diff --git a/src/main/resources/static/src/i18n/zh.js
b/src/main/resources/static/src/i18n/zh.js
index b6fa589..8a3b3ff 100644
--- a/src/main/resources/static/src/i18n/zh.js
+++ b/src/main/resources/static/src/i18n/zh.js
@@ -124,5 +124,11 @@ var zh = {
"GROUP_PERM":"消费组权限",
"SYNCHRONIZE":"同步",
"SHOW":"显示",
- "HIDE":"隐藏"
+ "HIDE":"隐藏",
+ "MESSAGE_TYPE":"消息类型",
+ "MESSAGE_TYPE_UNSPECIFIED": "未指定,为普通消息",
+ "MESSAGE_TYPE_NORMAL": "普通消息",
+ "MESSAGE_TYPE_FIFO": "顺序消息",
+ "MESSAGE_TYPE_DELAY": "定时/延时消息",
+ "MESSAGE_TYPE_TRANSACTION": "事务消息",
}
\ No newline at end of file
diff --git a/src/main/resources/static/src/topic.js
b/src/main/resources/static/src/topic.js
index 998f219..bce0df8 100644
--- a/src/main/resources/static/src/topic.js
+++ b/src/main/resources/static/src/topic.js
@@ -328,8 +328,8 @@ module.controller('topicController', ['$scope', 'ngDialog',
'$http', 'Notificati
var bIsUpdate = true;
if (request == null) {
request = [{
- writeQueueNums: 16,
- readQueueNums: 16,
+ writeQueueNums: 8,
+ readQueueNums: 8,
perm: 6,
order: false,
topicName: "",
@@ -355,6 +355,7 @@ module.controller('topicController', ['$scope', 'ngDialog',
'$http', 'Notificati
topicRequestList: request,
allClusterNameList:
Object.keys(resp.data.clusterInfo.clusterAddrTable),
allBrokerNameList: Object.keys(resp.data.brokerServer),
+ allMessageTypeList: resp.data.messageTypes,
bIsUpdate: bIsUpdate,
writeOperationEnabled: $scope.writeOperationEnabled
}
diff --git a/src/main/resources/static/view/pages/topic.html
b/src/main/resources/static/view/pages/topic.html
index bea5ac7..7547c10 100644
--- a/src/main/resources/static/view/pages/topic.html
+++ b/src/main/resources/static/view/pages/topic.html
@@ -63,6 +63,7 @@
<button class="btn btn-raised btn-sm btn-primary"
type="button"
ng-click="openUpdateDialog(topic,
sysFlag)">topic {{'CONFIG' |translate}}
</button>
+<!-- todo 发送消息,根据消息类型判断-->
<button class="btn btn-raised btn-sm btn-primary"
type="button"
ng-show="{{!sysFlag}}"
ng-click="openSendTopicMessageDialog(topic)">{{'SEND_MSG' | translate}}
@@ -189,6 +190,18 @@
<span class="text-danger"
ng-show="addAppForm.topicName.$error.required">{{'TOPIC_NAME'|translate}}不能为空.</span>
</div>
</div>
+ <!-- 设置topic 类型 -->
+ <div class="form-group">
+ <label class="control-label
col-sm-2">{{'MESSAGE_TYPE'|translate}}:</label>
+ <div class="col-sm-10">
+ <select name="mySelectMessageType" chosen
ng-disabled="ngDialogData.bIsUpdate"
+ ng-model="item.messageType"
+ ng-options="messageType as value | translate
disable when messageType=='UNSPECIFIED' for (messageType , value) in
ngDialogData.allMessageTypeList"
+ >
+ <option value=""></option>
+ </select>
+ </div>
+ </div>
<div class="form-group">
<label class="control-label
col-sm-2">{{'WRITE_QUEUE_NUMS'|translate}}:</label>
<div class="col-sm-10">