This is an automated email from the ASF dual-hosted git repository.
leonbao pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new af9ed35 [Feature][MasterServer] add cache for workGroup and schedule
#6987 (#7161)
af9ed35 is described below
commit af9ed352958f5204aa8561be28bc9b8874d9858a
Author: zwZjut <[email protected]>
AuthorDate: Sat Dec 4 16:59:05 2021 +0800
[Feature][MasterServer] add cache for workGroup and schedule #6987 (#7161)
* to #6987
* to #6987
* to #6987
* to #6987
* to #6987
* to #6987: @Param -> @Name
* to #6987: fix Sonar
* to #6987: fix Sonar
Co-authored-by: honghuo.zw <[email protected]>
---
.../api/aspect/CacheEvictAspect.java | 40 ++++++++++++++++++++++
.../dolphinscheduler/common/enums/CacheType.java | 4 ++-
.../dao/mapper/ScheduleMapper.java | 12 +++++++
.../dao/mapper/WorkerGroupMapper.java | 15 ++++++++
.../server/master/processor/CacheProcessor.java | 32 +++++++++++++++++
5 files changed, 102 insertions(+), 1 deletion(-)
diff --git
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/aspect/CacheEvictAspect.java
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/aspect/CacheEvictAspect.java
index a989b6c..336be21 100644
---
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/aspect/CacheEvictAspect.java
+++
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/aspect/CacheEvictAspect.java
@@ -22,7 +22,12 @@ import
org.apache.dolphinscheduler.remote.command.CacheExpireCommand;
import org.apache.dolphinscheduler.service.cache.CacheNotifyService;
import org.apache.dolphinscheduler.service.cache.impl.CacheKeyGenerator;
+import java.lang.annotation.Annotation;
import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
@@ -30,8 +35,12 @@ import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Pointcut;
import org.aspectj.lang.reflect.MethodSignature;
import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.context.properties.bind.Name;
import org.springframework.cache.annotation.CacheConfig;
import org.springframework.cache.annotation.CacheEvict;
+import org.springframework.expression.EvaluationContext;
+import org.springframework.expression.spel.standard.SpelExpressionParser;
+import org.springframework.expression.spel.support.StandardEvaluationContext;
import org.springframework.stereotype.Component;
/**
@@ -72,6 +81,10 @@ public class CacheEvictAspect {
if (method.getName().equalsIgnoreCase(UPDATE_BY_ID) && args.length
== 1) {
Object updateObj = args[0];
cacheNotifyService.notifyMaster(new
CacheExpireCommand(cacheType, updateObj).convert2Command());
+ } else if (!cacheEvict.key().isEmpty()) {
+ List<Name> paramsList = getParamAnnotationsByType(method,
Name.class);
+ String key = parseKey(cacheEvict.key(),
paramsList.stream().map(o -> o.value()).collect(Collectors.toList()),
Arrays.asList(args));
+ cacheNotifyService.notifyMaster(new
CacheExpireCommand(cacheType, key).convert2Command());
} else {
Object key = cacheKeyGenerator.generate(target, method, args);
cacheNotifyService.notifyMaster(new
CacheExpireCommand(cacheType, key).convert2Command());
@@ -99,4 +112,31 @@ public class CacheEvictAspect {
}
return null;
}
+
+ private String parseKey(String key, List<String> paramNameList,
List<Object> paramList) {
+ SpelExpressionParser spelParser = new SpelExpressionParser();
+ EvaluationContext ctx = new StandardEvaluationContext();
+ for (int i = 0; i < paramNameList.size(); i++) {
+ ctx.setVariable(paramNameList.get(i), paramList.get(i));
+ }
+ Object obj = spelParser.parseExpression(key).getValue(ctx);
+ if (null == obj) {
+ throw new RuntimeException("parseKey error");
+ }
+ return obj.toString();
+ }
+
+ private <T extends Annotation> List<T> getParamAnnotationsByType(Method
method, Class<T> annotationClass) {
+ List<T> annotationsList = new ArrayList<>();
+ Annotation[][] annotations = method.getParameterAnnotations();
+ for (int i = 0; i < annotations.length; i++) {
+ Annotation[] annotationsI = annotations[i];
+ for (Annotation annotation : annotationsI) {
+ if (annotation.annotationType().equals(annotationClass)) {
+ annotationsList.add((T) annotation);
+ }
+ }
+ }
+ return annotationsList;
+ }
}
diff --git
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/CacheType.java
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/CacheType.java
index f1921db..f845b20 100644
---
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/CacheType.java
+++
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/CacheType.java
@@ -23,7 +23,9 @@ public enum CacheType {
QUEUE("queue"),
PROCESS_DEFINITION("processDefinition"),
PROCESS_TASK_RELATION("processTaskRelation"),
- TASK_DEFINITION("taskDefinition");
+ TASK_DEFINITION("taskDefinition"),
+ WORKER_GROUP("workerGroup"),
+ SCHEDULE("schedule");
CacheType(String cacheName) {
this.cacheName = cacheName;
diff --git
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ScheduleMapper.java
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ScheduleMapper.java
index be7369c..1dc7cff 100644
---
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ScheduleMapper.java
+++
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ScheduleMapper.java
@@ -22,12 +22,18 @@ import org.apache.ibatis.annotations.Param;
import java.util.List;
+import org.springframework.boot.context.properties.bind.Name;
+import org.springframework.cache.annotation.CacheConfig;
+import org.springframework.cache.annotation.CacheEvict;
+import org.springframework.cache.annotation.Cacheable;
+
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.baomidou.mybatisplus.core.metadata.IPage;
/**
* scheduler mapper interface
*/
+@CacheConfig(cacheNames = "schedule", keyGenerator = "cacheKeyGenerator")
public interface ScheduleMapper extends BaseMapper<Schedule> {
/**
@@ -67,6 +73,12 @@ public interface ScheduleMapper extends BaseMapper<Schedule>
{
* @param processDefinitionCode processDefinitionCode
* @return schedule list
*/
+ @Cacheable(sync = true)
List<Schedule>
queryReleaseSchedulerListByProcessDefinitionCode(@Param("processDefinitionCode")
long processDefinitionCode);
+ @CacheEvict(key = "#entity.processDefinitionCode")
+ int insert(@Name("entity") Schedule entity);
+
+ @CacheEvict
+ int updateById(@Name("entity") @Param("et")Schedule entity);
}
diff --git
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/WorkerGroupMapper.java
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/WorkerGroupMapper.java
index 21af4b3..b932c0b 100644
---
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/WorkerGroupMapper.java
+++
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/WorkerGroupMapper.java
@@ -23,19 +23,34 @@ import org.apache.ibatis.annotations.Param;
import java.util.List;
+import org.springframework.cache.annotation.CacheConfig;
+import org.springframework.cache.annotation.CacheEvict;
+import org.springframework.cache.annotation.Cacheable;
+
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
/**
* worker group mapper interface
*/
+@CacheConfig(cacheNames = "workerGroup", keyGenerator = "cacheKeyGenerator")
public interface WorkerGroupMapper extends BaseMapper<WorkerGroup> {
/**
* query all worker group
* @return worker group list
*/
+ @Cacheable(sync = true, key = "'all'")
List<WorkerGroup> queryAllWorkerGroup();
+ @CacheEvict
+ int deleteById(Integer id);
+
+ @CacheEvict
+ int insert(WorkerGroup entity);
+
+ @CacheEvict
+ int updateById(@Param("et") WorkerGroup entity);
+
/**
* query worer grouop by name
* @param name name
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/CacheProcessor.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/CacheProcessor.java
index 778d1ae..c66a57b 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/CacheProcessor.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/CacheProcessor.java
@@ -22,9 +22,11 @@ import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelation;
import org.apache.dolphinscheduler.dao.entity.Queue;
+import org.apache.dolphinscheduler.dao.entity.Schedule;
import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
import org.apache.dolphinscheduler.dao.entity.Tenant;
import org.apache.dolphinscheduler.dao.entity.User;
+import org.apache.dolphinscheduler.dao.entity.WorkerGroup;
import org.apache.dolphinscheduler.remote.command.CacheExpireCommand;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType;
@@ -108,6 +110,18 @@ public class CacheProcessor implements
NettyRequestProcessor {
processTaskRelationCacheExpire(processTaskRelation);
}
break;
+ case WORKER_GROUP:
+ if (object instanceof WorkerGroup) {
+ WorkerGroup workerGroup = (WorkerGroup) object;
+ workerGroupCacheExpire(workerGroup);
+ }
+ break;
+ case SCHEDULE:
+ if (object instanceof Schedule) {
+ Schedule schedule = (Schedule) object;
+ scheduleCacheExpire(schedule);
+ }
+ break;
default:
logger.error("no support cache type:{}", cacheType);
}
@@ -173,4 +187,22 @@ public class CacheProcessor implements
NettyRequestProcessor {
CacheType.TASK_DEFINITION.getCacheName(),
taskDefinition.getCode() + "_" + taskDefinition.getVersion());
}
}
+
+ private void workerGroupCacheExpire(WorkerGroup workerGroup) {
+ Cache cache =
cacheManager.getCache(CacheType.WORKER_GROUP.getCacheName());
+ if (cache != null) {
+ cache.evict("all");
+ logger.info("cache evict, type:{}, key:{}",
+ CacheType.WORKER_GROUP.getCacheName(), "all");
+ }
+ }
+
+ private void scheduleCacheExpire(Schedule schedule) {
+ Cache cache = cacheManager.getCache(CacheType.SCHEDULE.getCacheName());
+ if (cache != null) {
+ cache.evict(schedule.getProcessDefinitionCode());
+ logger.info("cache evict, type:{}, key:{}",
+ CacheType.SCHEDULE.getCacheName(),
schedule.getProcessDefinitionCode());
+ }
+ }
}