This is an automated email from the ASF dual-hosted git repository.

gaoxihui pushed a commit to branch app-role-id-extend
in repository https://gitbox.apache.org/repos/asf/ozhera.git


The following commit(s) were added to refs/heads/app-role-id-extend by this 
push:
     new 4621480e feat: allow configuring platform codes to prohibit MQ 
messages from accessing the database
4621480e is described below

commit 4621480ece87f64029f79598acbe3bb1c17058c2
Author: gaoxihui <[email protected]>
AuthorDate: Thu Jan 8 10:51:38 2026 +0800

    feat: allow configuring platform codes to prohibit MQ messages from 
accessing the database
---
 .../app/service/mq/RocketMqHeraAppConsumer.java    | 48 ++++++++++++++++++----
 1 file changed, 39 insertions(+), 9 deletions(-)

diff --git 
a/ozhera-app/app-service/src/main/java/org/apache/ozhera/app/service/mq/RocketMqHeraAppConsumer.java
 
b/ozhera-app/app-service/src/main/java/org/apache/ozhera/app/service/mq/RocketMqHeraAppConsumer.java
index 297f67eb..20319273 100644
--- 
a/ozhera-app/app-service/src/main/java/org/apache/ozhera/app/service/mq/RocketMqHeraAppConsumer.java
+++ 
b/ozhera-app/app-service/src/main/java/org/apache/ozhera/app/service/mq/RocketMqHeraAppConsumer.java
@@ -20,6 +20,7 @@ package org.apache.ozhera.app.service.mq;
 
 import com.alibaba.nacos.api.config.annotation.NacosValue;
 import com.google.gson.Gson;
+import org.apache.commons.collections.CollectionUtils;
 import org.apache.ozhera.app.api.message.HeraAppInfoModifyMessage;
 import org.apache.ozhera.app.api.message.HeraAppModifyType;
 import org.apache.ozhera.app.api.model.HeraAppBaseInfoModel;
@@ -30,7 +31,6 @@ import org.apache.ozhera.app.model.HeraAppRole;
 import org.apache.ozhera.app.service.impl.HeraAppBaseInfoService;
 import org.apache.ozhera.app.service.mq.model.HeraAppMessage;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.dubbo.common.utils.CollectionUtils;
 import org.apache.dubbo.common.utils.StringUtils;
 import org.apache.rocketmq.acl.common.AclClientRPCHook;
 import org.apache.rocketmq.acl.common.SessionCredentials;
@@ -47,6 +47,7 @@ import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
 
 import javax.annotation.PostConstruct;
+import java.util.Arrays;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.stream.Collectors;
@@ -79,10 +80,13 @@ public class RocketMqHeraAppConsumer {
     @NacosValue(value = "${rocketmq.sk}", autoRefreshed = true)
     private String sk;
 
-    //默认为空,根据需要配置
+    //stop mq consumer
     @NacosValue(value = "${stop.mq:false}", autoRefreshed = true)
     private Boolean stopMq;
 
+    @NacosValue(value = "${stop.mq.message.plat.code:}", autoRefreshed = true)
+    private String mqStopPlatCode;
+
     private DefaultMQPushConsumer heraAppMQPushConsumer;
 
     @Autowired
@@ -157,6 +161,35 @@ public class RocketMqHeraAppConsumer {
             log.info("RocketMqHeraAppConsumer# consumeMessage convert 
heraAppMessage : {}", heraAppMessage.toString());
 
             HeraAppBaseInfo changeHeraApp = heraAppMessage.baseInfo();
+
+
+            if(stopMq){
+
+                log.info("Mq consumer stop ...");
+                if(StringUtils.isBlank(mqStopPlatCode)){
+                    log.info("mqStopPlatCode is blank, stop all mq message 
access db!");
+                    return;
+                }
+
+
+                List<Integer> platCodes = 
Arrays.stream(mqStopPlatCode.split(","))
+                        .map(String::trim)
+                        .map(Integer::parseInt)
+                        .collect(Collectors.toList());
+
+                if(CollectionUtils.isEmpty(platCodes)){
+                    log.info("platCodes is empty, stop all mq message access 
db!");
+                    return;
+                }
+
+                if(platCodes.contains(changeHeraApp.getPlatformType())){
+                    log.info("stop mq access db by plat code : " + 
changeHeraApp.getPlatformType() + " ; config plat codes " + platCodes);
+                    return;
+                }
+
+            }
+
+
             HeraAppBaseInfo origHeraApp = 
matchExistHeraApp(heraAppMessage.baseInfo());
 
             if (heraAppMessage.getDelete() != null && 
heraAppMessage.getDelete().intValue() == 1) {
@@ -210,17 +243,14 @@ public class RocketMqHeraAppConsumer {
         return query.get(0);
     }
 
-    private void saveOrUpdateHeraAppRole(List<String> members, String appId, 
Integer platFormType) {
+    private void saveOrUpdateHeraAppRole(List<String> membersP, String appId, 
Integer platFormType) {
 
-        if(stopMq){
-            log.info("Mq consumer stop ...");
+        log.info("RocketMqHeraAppConsumer#saveOrUpdateHeraAppRole 
appId:{},platFormType:{},members:{}", appId, platFormType, membersP);
+        if (CollectionUtils.isEmpty(membersP)) {
             return;
         }
 
-        log.info("RocketMqHeraAppConsumer#saveOrUpdateHeraAppRole 
appId:{},platFormType:{},members:{}", appId, platFormType, members);
-        if (CollectionUtils.isEmpty(members)) {
-            return;
-        }
+        List<String> members = 
membersP.stream().distinct().collect(Collectors.toList());
 
         HeraAppRole role = new HeraAppRole();
         role.setRole(0);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to