This is an automated email from the ASF dual-hosted git repository.
zhangxiaowei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozhera.git
The following commit(s) were added to refs/heads/master by this push:
new b915d7f4 feat: allow configuring platform codes to prohibit MQ
messages from accessing the database (#628)
b915d7f4 is described below
commit b915d7f4799624e59a1d3f2b66867e1af7c3b4f2
Author: gaoxh <[email protected]>
AuthorDate: Thu Jan 8 14:27:19 2026 +0800
feat: allow configuring platform codes to prohibit MQ messages from
accessing the database (#628)
* refactor: extend the id column in the hera_app_role table to the bigint
type
* feat: allow configuring platform codes to prohibit MQ messages from
accessing the database
---------
Co-authored-by: gaoxihui <[email protected]>
---
.../app/service/mq/RocketMqHeraAppConsumer.java | 48 ++++++++++++++++++----
1 file changed, 39 insertions(+), 9 deletions(-)
diff --git
a/ozhera-app/app-service/src/main/java/org/apache/ozhera/app/service/mq/RocketMqHeraAppConsumer.java
b/ozhera-app/app-service/src/main/java/org/apache/ozhera/app/service/mq/RocketMqHeraAppConsumer.java
index 297f67eb..20319273 100644
---
a/ozhera-app/app-service/src/main/java/org/apache/ozhera/app/service/mq/RocketMqHeraAppConsumer.java
+++
b/ozhera-app/app-service/src/main/java/org/apache/ozhera/app/service/mq/RocketMqHeraAppConsumer.java
@@ -20,6 +20,7 @@ package org.apache.ozhera.app.service.mq;
import com.alibaba.nacos.api.config.annotation.NacosValue;
import com.google.gson.Gson;
+import org.apache.commons.collections.CollectionUtils;
import org.apache.ozhera.app.api.message.HeraAppInfoModifyMessage;
import org.apache.ozhera.app.api.message.HeraAppModifyType;
import org.apache.ozhera.app.api.model.HeraAppBaseInfoModel;
@@ -30,7 +31,6 @@ import org.apache.ozhera.app.model.HeraAppRole;
import org.apache.ozhera.app.service.impl.HeraAppBaseInfoService;
import org.apache.ozhera.app.service.mq.model.HeraAppMessage;
import lombok.extern.slf4j.Slf4j;
-import org.apache.dubbo.common.utils.CollectionUtils;
import org.apache.dubbo.common.utils.StringUtils;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
@@ -47,6 +47,7 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
+import java.util.Arrays;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
@@ -79,10 +80,13 @@ public class RocketMqHeraAppConsumer {
@NacosValue(value = "${rocketmq.sk}", autoRefreshed = true)
private String sk;
- //默认为空,根据需要配置
+ //stop mq consumer
@NacosValue(value = "${stop.mq:false}", autoRefreshed = true)
private Boolean stopMq;
+ @NacosValue(value = "${stop.mq.message.plat.code:}", autoRefreshed = true)
+ private String mqStopPlatCode;
+
private DefaultMQPushConsumer heraAppMQPushConsumer;
@Autowired
@@ -157,6 +161,35 @@ public class RocketMqHeraAppConsumer {
log.info("RocketMqHeraAppConsumer# consumeMessage convert
heraAppMessage : {}", heraAppMessage.toString());
HeraAppBaseInfo changeHeraApp = heraAppMessage.baseInfo();
+
+
+ if(stopMq){
+
+ log.info("Mq consumer stop ...");
+ if(StringUtils.isBlank(mqStopPlatCode)){
+ log.info("mqStopPlatCode is blank, stop all mq message
access db!");
+ return;
+ }
+
+
+ List<Integer> platCodes =
Arrays.stream(mqStopPlatCode.split(","))
+ .map(String::trim)
+ .map(Integer::parseInt)
+ .collect(Collectors.toList());
+
+ if(CollectionUtils.isEmpty(platCodes)){
+ log.info("platCodes is empty, stop all mq message access
db!");
+ return;
+ }
+
+ if(platCodes.contains(changeHeraApp.getPlatformType())){
+ log.info("stop mq access db by plat code : " +
changeHeraApp.getPlatformType() + " ; config plat codes " + platCodes);
+ return;
+ }
+
+ }
+
+
HeraAppBaseInfo origHeraApp =
matchExistHeraApp(heraAppMessage.baseInfo());
if (heraAppMessage.getDelete() != null &&
heraAppMessage.getDelete().intValue() == 1) {
@@ -210,17 +243,14 @@ public class RocketMqHeraAppConsumer {
return query.get(0);
}
- private void saveOrUpdateHeraAppRole(List<String> members, String appId,
Integer platFormType) {
+ private void saveOrUpdateHeraAppRole(List<String> membersP, String appId,
Integer platFormType) {
- if(stopMq){
- log.info("Mq consumer stop ...");
+ log.info("RocketMqHeraAppConsumer#saveOrUpdateHeraAppRole
appId:{},platFormType:{},members:{}", appId, platFormType, membersP);
+ if (CollectionUtils.isEmpty(membersP)) {
return;
}
- log.info("RocketMqHeraAppConsumer#saveOrUpdateHeraAppRole
appId:{},platFormType:{},members:{}", appId, platFormType, members);
- if (CollectionUtils.isEmpty(members)) {
- return;
- }
+ List<String> members =
membersP.stream().distinct().collect(Collectors.toList());
HeraAppRole role = new HeraAppRole();
role.setRole(0);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]