This is an automated email from the ASF dual-hosted git repository.

leonbao pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new af9ed35  [Feature][MasterServer] add cache for workGroup and schedule 
#6987 (#7161)
af9ed35 is described below

commit af9ed352958f5204aa8561be28bc9b8874d9858a
Author: zwZjut <[email protected]>
AuthorDate: Sat Dec 4 16:59:05 2021 +0800

    [Feature][MasterServer] add cache for workGroup and schedule #6987 (#7161)
    
    * to #6987
    
    * to #6987
    
    * to #6987
    
    * to #6987
    
    * to #6987
    
    * to #6987: @Param -> @Name
    
    * to #6987: fix Sonar
    
    * to #6987: fix Sonar
    
    Co-authored-by: honghuo.zw <[email protected]>
---
 .../api/aspect/CacheEvictAspect.java               | 40 ++++++++++++++++++++++
 .../dolphinscheduler/common/enums/CacheType.java   |  4 ++-
 .../dao/mapper/ScheduleMapper.java                 | 12 +++++++
 .../dao/mapper/WorkerGroupMapper.java              | 15 ++++++++
 .../server/master/processor/CacheProcessor.java    | 32 +++++++++++++++++
 5 files changed, 102 insertions(+), 1 deletion(-)

diff --git 
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/aspect/CacheEvictAspect.java
 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/aspect/CacheEvictAspect.java
index a989b6c..336be21 100644
--- 
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/aspect/CacheEvictAspect.java
+++ 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/aspect/CacheEvictAspect.java
@@ -22,7 +22,12 @@ import 
org.apache.dolphinscheduler.remote.command.CacheExpireCommand;
 import org.apache.dolphinscheduler.service.cache.CacheNotifyService;
 import org.apache.dolphinscheduler.service.cache.impl.CacheKeyGenerator;
 
+import java.lang.annotation.Annotation;
 import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
 
 import org.aspectj.lang.ProceedingJoinPoint;
 import org.aspectj.lang.annotation.Around;
@@ -30,8 +35,12 @@ import org.aspectj.lang.annotation.Aspect;
 import org.aspectj.lang.annotation.Pointcut;
 import org.aspectj.lang.reflect.MethodSignature;
 import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.context.properties.bind.Name;
 import org.springframework.cache.annotation.CacheConfig;
 import org.springframework.cache.annotation.CacheEvict;
+import org.springframework.expression.EvaluationContext;
+import org.springframework.expression.spel.standard.SpelExpressionParser;
+import org.springframework.expression.spel.support.StandardEvaluationContext;
 import org.springframework.stereotype.Component;
 
 /**
@@ -72,6 +81,10 @@ public class CacheEvictAspect {
             if (method.getName().equalsIgnoreCase(UPDATE_BY_ID) && args.length 
== 1) {
                 Object updateObj = args[0];
                 cacheNotifyService.notifyMaster(new 
CacheExpireCommand(cacheType, updateObj).convert2Command());
+            } else if (!cacheEvict.key().isEmpty()) {
+                List<Name> paramsList = getParamAnnotationsByType(method, 
Name.class);
+                String key = parseKey(cacheEvict.key(), 
paramsList.stream().map(o -> o.value()).collect(Collectors.toList()), 
Arrays.asList(args));
+                cacheNotifyService.notifyMaster(new 
CacheExpireCommand(cacheType, key).convert2Command());
             } else {
                 Object key = cacheKeyGenerator.generate(target, method, args);
                 cacheNotifyService.notifyMaster(new 
CacheExpireCommand(cacheType, key).convert2Command());
@@ -99,4 +112,31 @@ public class CacheEvictAspect {
         }
         return null;
     }
+
+    private String parseKey(String key, List<String> paramNameList, 
List<Object> paramList) {
+        SpelExpressionParser spelParser = new SpelExpressionParser();
+        EvaluationContext ctx = new StandardEvaluationContext();
+        for (int i = 0; i < paramNameList.size(); i++) {
+            ctx.setVariable(paramNameList.get(i), paramList.get(i));
+        }
+        Object obj = spelParser.parseExpression(key).getValue(ctx);
+        if (null == obj) {
+            throw new RuntimeException("parseKey error");
+        }
+        return obj.toString();
+    }
+
+    private <T extends Annotation> List<T> getParamAnnotationsByType(Method 
method, Class<T> annotationClass) {
+        List<T> annotationsList = new ArrayList<>();
+        Annotation[][] annotations = method.getParameterAnnotations();
+        for (int i = 0; i < annotations.length; i++) {
+            Annotation[] annotationsI = annotations[i];
+            for (Annotation annotation : annotationsI) {
+                if (annotation.annotationType().equals(annotationClass)) {
+                    annotationsList.add((T) annotation);
+                }
+            }
+        }
+        return annotationsList;
+    }
 }
diff --git 
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/CacheType.java
 
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/CacheType.java
index f1921db..f845b20 100644
--- 
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/CacheType.java
+++ 
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/CacheType.java
@@ -23,7 +23,9 @@ public enum CacheType {
     QUEUE("queue"),
     PROCESS_DEFINITION("processDefinition"),
     PROCESS_TASK_RELATION("processTaskRelation"),
-    TASK_DEFINITION("taskDefinition");
+    TASK_DEFINITION("taskDefinition"),
+    WORKER_GROUP("workerGroup"),
+    SCHEDULE("schedule");
 
     CacheType(String cacheName) {
         this.cacheName = cacheName;
diff --git 
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ScheduleMapper.java
 
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ScheduleMapper.java
index be7369c..1dc7cff 100644
--- 
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ScheduleMapper.java
+++ 
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ScheduleMapper.java
@@ -22,12 +22,18 @@ import org.apache.ibatis.annotations.Param;
 
 import java.util.List;
 
+import org.springframework.boot.context.properties.bind.Name;
+import org.springframework.cache.annotation.CacheConfig;
+import org.springframework.cache.annotation.CacheEvict;
+import org.springframework.cache.annotation.Cacheable;
+
 import com.baomidou.mybatisplus.core.mapper.BaseMapper;
 import com.baomidou.mybatisplus.core.metadata.IPage;
 
 /**
  * scheduler mapper interface
  */
+@CacheConfig(cacheNames = "schedule", keyGenerator = "cacheKeyGenerator")
 public interface ScheduleMapper extends BaseMapper<Schedule> {
 
     /**
@@ -67,6 +73,12 @@ public interface ScheduleMapper extends BaseMapper<Schedule> 
{
      * @param processDefinitionCode processDefinitionCode
      * @return schedule list
      */
+    @Cacheable(sync = true)
     List<Schedule> 
queryReleaseSchedulerListByProcessDefinitionCode(@Param("processDefinitionCode")
 long processDefinitionCode);
 
+    @CacheEvict(key = "#entity.processDefinitionCode")
+    int insert(@Name("entity") Schedule entity);
+
+    @CacheEvict
+    int updateById(@Name("entity") @Param("et")Schedule entity);
 }
diff --git 
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/WorkerGroupMapper.java
 
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/WorkerGroupMapper.java
index 21af4b3..b932c0b 100644
--- 
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/WorkerGroupMapper.java
+++ 
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/WorkerGroupMapper.java
@@ -23,19 +23,34 @@ import org.apache.ibatis.annotations.Param;
 
 import java.util.List;
 
+import org.springframework.cache.annotation.CacheConfig;
+import org.springframework.cache.annotation.CacheEvict;
+import org.springframework.cache.annotation.Cacheable;
+
 import com.baomidou.mybatisplus.core.mapper.BaseMapper;
 
 /**
  * worker group mapper interface
  */
+@CacheConfig(cacheNames = "workerGroup", keyGenerator = "cacheKeyGenerator")
 public interface WorkerGroupMapper extends BaseMapper<WorkerGroup> {
 
     /**
      * query all worker group
      * @return worker group list
      */
+    @Cacheable(sync = true, key = "'all'")
     List<WorkerGroup> queryAllWorkerGroup();
 
+    @CacheEvict
+    int deleteById(Integer id);
+
+    @CacheEvict
+    int insert(WorkerGroup entity);
+
+    @CacheEvict
+    int updateById(@Param("et") WorkerGroup entity);
+
     /**
      * query worer grouop by name
      * @param name name
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/CacheProcessor.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/CacheProcessor.java
index 778d1ae..c66a57b 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/CacheProcessor.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/CacheProcessor.java
@@ -22,9 +22,11 @@ import org.apache.dolphinscheduler.common.utils.JSONUtils;
 import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
 import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelation;
 import org.apache.dolphinscheduler.dao.entity.Queue;
+import org.apache.dolphinscheduler.dao.entity.Schedule;
 import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
 import org.apache.dolphinscheduler.dao.entity.Tenant;
 import org.apache.dolphinscheduler.dao.entity.User;
+import org.apache.dolphinscheduler.dao.entity.WorkerGroup;
 import org.apache.dolphinscheduler.remote.command.CacheExpireCommand;
 import org.apache.dolphinscheduler.remote.command.Command;
 import org.apache.dolphinscheduler.remote.command.CommandType;
@@ -108,6 +110,18 @@ public class CacheProcessor implements 
NettyRequestProcessor {
                     processTaskRelationCacheExpire(processTaskRelation);
                 }
                 break;
+            case WORKER_GROUP:
+                if (object instanceof WorkerGroup) {
+                    WorkerGroup workerGroup = (WorkerGroup) object;
+                    workerGroupCacheExpire(workerGroup);
+                }
+                break;
+            case SCHEDULE:
+                if (object instanceof Schedule) {
+                    Schedule schedule = (Schedule) object;
+                    scheduleCacheExpire(schedule);
+                }
+                break;
             default:
                 logger.error("no support cache type:{}", cacheType);
         }
@@ -173,4 +187,22 @@ public class CacheProcessor implements 
NettyRequestProcessor {
                     CacheType.TASK_DEFINITION.getCacheName(), 
taskDefinition.getCode() + "_" + taskDefinition.getVersion());
         }
     }
+
+    private void workerGroupCacheExpire(WorkerGroup workerGroup) {
+        Cache cache = 
cacheManager.getCache(CacheType.WORKER_GROUP.getCacheName());
+        if (cache != null) {
+            cache.evict("all");
+            logger.info("cache evict, type:{}, key:{}",
+                    CacheType.WORKER_GROUP.getCacheName(), "all");
+        }
+    }
+
+    private void scheduleCacheExpire(Schedule schedule) {
+        Cache cache = cacheManager.getCache(CacheType.SCHEDULE.getCacheName());
+        if (cache != null) {
+            cache.evict(schedule.getProcessDefinitionCode());
+            logger.info("cache evict, type:{}, key:{}",
+                    CacheType.SCHEDULE.getCacheName(), 
schedule.getProcessDefinitionCode());
+        }
+    }
 }

Reply via email to