This is an automated email from the ASF dual-hosted git repository.
leonbao pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new a80a7b3 [Feature-6963][MasterServer] unified cache manager (#7187)
a80a7b3 is described below
commit a80a7b3fcf6154764ef82b3b6536c8ee54f90444
Author: wind <[email protected]>
AuthorDate: Tue Dec 7 15:55:22 2021 +0800
[Feature-6963][MasterServer] unified cache manager (#7187)
* unified cache
* reduce db select
* checkstyle
Co-authored-by: caishunfeng <[email protected]>
---
.../api/aspect/CacheEvictAspect.java | 32 +++--
.../dao/mapper/ProcessDefinitionMapper.java | 9 +-
.../dao/mapper/ProcessTaskRelationMapper.java | 43 ++++---
.../dao/mapper/ScheduleMapper.java | 2 +-
.../dao/mapper/TaskDefinitionLogMapper.java | 11 +-
.../dolphinscheduler/dao/mapper/TenantMapper.java | 5 +-
.../dolphinscheduler/dao/mapper/UserMapper.java | 5 +-
.../dao/mapper/WorkerGroupMapper.java | 10 +-
.../remote/command/CacheExpireCommand.java | 18 +--
.../command/cache/CacheExpireCommandTest.java | 5 +-
.../server/master/config/MasterConfig.java | 9 --
.../server/master/processor/CacheProcessor.java | 143 +--------------------
.../server/master/runner/EventExecuteService.java | 7 +-
.../master/runner/WorkflowExecuteThread.java | 44 +++++--
.../src/main/resources/application-master.yaml | 8 +-
.../master/processor/CacheProcessorTest.java | 2 +-
.../service/process/ProcessService.java | 15 ++-
.../service/cache/CacheNotifyServiceTest.java | 2 +-
18 files changed, 138 insertions(+), 232 deletions(-)
diff --git
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/aspect/CacheEvictAspect.java
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/aspect/CacheEvictAspect.java
index 336be21..39d6975 100644
---
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/aspect/CacheEvictAspect.java
+++
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/aspect/CacheEvictAspect.java
@@ -22,6 +22,8 @@ import
org.apache.dolphinscheduler.remote.command.CacheExpireCommand;
import org.apache.dolphinscheduler.service.cache.CacheNotifyService;
import org.apache.dolphinscheduler.service.cache.impl.CacheKeyGenerator;
+import org.apache.commons.lang3.StringUtils;
+
import java.lang.annotation.Annotation;
import java.lang.reflect.Method;
import java.util.ArrayList;
@@ -34,6 +36,8 @@ import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Pointcut;
import org.aspectj.lang.reflect.MethodSignature;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.context.properties.bind.Name;
import org.springframework.cache.annotation.CacheConfig;
@@ -50,7 +54,12 @@ import org.springframework.stereotype.Component;
@Component
public class CacheEvictAspect {
- private static final String UPDATE_BY_ID = "updateById";
+ private static final Logger logger =
LoggerFactory.getLogger(CacheEvictAspect.class);
+
+ /**
+ * symbol of spring el
+ */
+ private static final String EL_SYMBOL = "#";
@Autowired
private CacheKeyGenerator cacheKeyGenerator;
@@ -77,17 +86,18 @@ public class CacheEvictAspect {
CacheType cacheType = getCacheType(cacheConfig, cacheEvict);
if (cacheType != null) {
- // todo use springEL is better
- if (method.getName().equalsIgnoreCase(UPDATE_BY_ID) && args.length
== 1) {
- Object updateObj = args[0];
- cacheNotifyService.notifyMaster(new
CacheExpireCommand(cacheType, updateObj).convert2Command());
- } else if (!cacheEvict.key().isEmpty()) {
- List<Name> paramsList = getParamAnnotationsByType(method,
Name.class);
- String key = parseKey(cacheEvict.key(),
paramsList.stream().map(o -> o.value()).collect(Collectors.toList()),
Arrays.asList(args));
- cacheNotifyService.notifyMaster(new
CacheExpireCommand(cacheType, key).convert2Command());
+ String cacheKey;
+ if (cacheEvict.key().isEmpty()) {
+ cacheKey = (String) cacheKeyGenerator.generate(target, method,
args);
} else {
- Object key = cacheKeyGenerator.generate(target, method, args);
- cacheNotifyService.notifyMaster(new
CacheExpireCommand(cacheType, key).convert2Command());
+ cacheKey = cacheEvict.key();
+ List<Name> paramsList = getParamAnnotationsByType(method,
Name.class);
+ if (cacheEvict.key().contains(EL_SYMBOL)) {
+ cacheKey = parseKey(cacheEvict.key(),
paramsList.stream().map(o -> o.value()).collect(Collectors.toList()),
Arrays.asList(args));
+ }
+ }
+ if (StringUtils.isNotEmpty(cacheKey)) {
+ cacheNotifyService.notifyMaster(new
CacheExpireCommand(cacheType, cacheKey).convert2Command());
}
}
diff --git
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionMapper.java
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionMapper.java
index ca287e2..954b55b 100644
---
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionMapper.java
+++
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionMapper.java
@@ -27,6 +27,7 @@ import java.util.Collection;
import java.util.List;
import java.util.Map;
+import org.springframework.boot.context.properties.bind.Name;
import org.springframework.cache.annotation.CacheConfig;
import org.springframework.cache.annotation.CacheEvict;
import org.springframework.cache.annotation.Cacheable;
@@ -52,8 +53,8 @@ public interface ProcessDefinitionMapper extends
BaseMapper<ProcessDefinition> {
/**
* update
*/
- @CacheEvict
- int updateById(@Param("et") ProcessDefinition processDefinition);
+ @CacheEvict(key = "#processDefinition.code")
+ int updateById(@Name("processDefinition") @Param("et") ProcessDefinition
processDefinition);
/**
* query process definition by code list
@@ -69,8 +70,8 @@ public interface ProcessDefinitionMapper extends
BaseMapper<ProcessDefinition> {
* @param code code
* @return delete result
*/
- @CacheEvict
- int deleteByCode(@Param("code") long code);
+ @CacheEvict(key = "#code")
+ int deleteByCode(@Name("code") @Param("code") long code);
/**
* verify process definition by name
diff --git
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationMapper.java
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationMapper.java
index 5b3ea75..010b8e4 100644
---
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationMapper.java
+++
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationMapper.java
@@ -25,6 +25,7 @@ import org.apache.ibatis.annotations.Param;
import java.util.List;
import java.util.Map;
+import org.springframework.boot.context.properties.bind.Name;
import org.springframework.cache.annotation.CacheConfig;
import org.springframework.cache.annotation.CacheEvict;
import org.springframework.cache.annotation.Cacheable;
@@ -51,8 +52,8 @@ public interface ProcessTaskRelationMapper extends
BaseMapper<ProcessTaskRelatio
/**
* update
*/
- @CacheEvict
- int updateById(@Param("et") ProcessTaskRelation processTaskRelation);
+ @CacheEvict(key = "#processTaskRelation.projectCode + '_' +
#processTaskRelation.processDefinitionCode")
+ int updateById(@Name("processTaskRelation") @Param("et")
ProcessTaskRelation processTaskRelation);
/**
* process task relation by taskCode
@@ -77,9 +78,9 @@ public interface ProcessTaskRelationMapper extends
BaseMapper<ProcessTaskRelatio
* @param processCode processCode
* @return int
*/
- @CacheEvict
- int deleteByCode(@Param("projectCode") long projectCode,
- @Param("processCode") long processCode);
+ @CacheEvict(key = "#projectCode + '_' + #processCode")
+ int deleteByCode(@Name("projectCode") @Param("projectCode") long
projectCode,
+ @Name("processCode") @Param("processCode") long
processCode);
/**
* batch insert process task relation
@@ -101,7 +102,7 @@ public interface ProcessTaskRelationMapper extends
BaseMapper<ProcessTaskRelatio
* query upstream process task relation by taskCode
*
* @param projectCode projectCode
- * @param taskCode taskCode
+ * @param taskCode taskCode
* @return ProcessTaskRelation
*/
List<ProcessTaskRelation> queryUpstreamByCode(@Param("projectCode") long
projectCode, @Param("taskCode") long taskCode);
@@ -110,7 +111,7 @@ public interface ProcessTaskRelationMapper extends
BaseMapper<ProcessTaskRelatio
* query downstream process task relation by taskCode
*
* @param projectCode projectCode
- * @param taskCode taskCode
+ * @param taskCode taskCode
* @return ProcessTaskRelation
*/
List<ProcessTaskRelation> queryDownstreamByCode(@Param("projectCode") long
projectCode, @Param("taskCode") long taskCode);
@@ -118,24 +119,24 @@ public interface ProcessTaskRelationMapper extends
BaseMapper<ProcessTaskRelatio
/**
* query task relation by codes
*
- * @param projectCode projectCode
- * @param taskCode taskCode
+ * @param projectCode projectCode
+ * @param taskCode taskCode
* @param preTaskCodes preTaskCode list
* @return ProcessTaskRelation
*/
- List<ProcessTaskRelation> queryUpstreamByCodes(@Param("projectCode") long
projectCode, @Param("taskCode") long taskCode,@Param("preTaskCodes") Long[]
preTaskCodes);
+ List<ProcessTaskRelation> queryUpstreamByCodes(@Param("projectCode") long
projectCode, @Param("taskCode") long taskCode, @Param("preTaskCodes") Long[]
preTaskCodes);
/**
* count upstream by codes
*
* @param projectCode projectCode
- * @param taskCode taskCode
- * @param processDefinitionCodes processDefinitionCodes
+ * @param taskCode taskCode
+ * @param processDefinitionCodes processDefinitionCodes
* @return upstream count list group by process definition code
*/
List<Map<String, Long>>
countUpstreamByCodeGroupByProcessDefinitionCode(@Param("projectCode") long
projectCode,
-
@Param("processDefinitionCodes") Long[] processDefinitionCodes,
-
@Param("taskCode") long taskCode);
+
@Param("processDefinitionCodes") Long[] processDefinitionCodes,
+
@Param("taskCode") long taskCode);
/**
* batch update process task relation pre task
@@ -148,10 +149,10 @@ public interface ProcessTaskRelationMapper extends
BaseMapper<ProcessTaskRelatio
/**
* query by code
*
- * @param projectCode projectCode
+ * @param projectCode projectCode
* @param processDefinitionCode processDefinitionCode
- * @param preTaskCode preTaskCode
- * @param postTaskCode postTaskCode
+ * @param preTaskCode preTaskCode
+ * @param postTaskCode postTaskCode
* @return ProcessTaskRelation
*/
List<ProcessTaskRelation> queryByCode(@Param("projectCode") long
projectCode,
@@ -162,7 +163,7 @@ public interface ProcessTaskRelationMapper extends
BaseMapper<ProcessTaskRelatio
/**
* delete process task relation
*
- * @param processTaskRelationLog processTaskRelationLog
+ * @param processTaskRelationLog processTaskRelationLog
* @return int
*/
int deleteRelation(@Param("processTaskRelationLog") ProcessTaskRelationLog
processTaskRelationLog);
@@ -170,10 +171,10 @@ public interface ProcessTaskRelationMapper extends
BaseMapper<ProcessTaskRelatio
/**
* count by code
*
- * @param projectCode projectCode
+ * @param projectCode projectCode
* @param processDefinitionCode processDefinitionCode
- * @param preTaskCode preTaskCode
- * @param postTaskCode postTaskCode
+ * @param preTaskCode preTaskCode
+ * @param postTaskCode postTaskCode
* @return ProcessTaskRelation
*/
int countByCode(@Param("projectCode") long projectCode,
diff --git
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ScheduleMapper.java
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ScheduleMapper.java
index 1dc7cff..52fabae 100644
---
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ScheduleMapper.java
+++
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ScheduleMapper.java
@@ -79,6 +79,6 @@ public interface ScheduleMapper extends BaseMapper<Schedule> {
@CacheEvict(key = "#entity.processDefinitionCode")
int insert(@Name("entity") Schedule entity);
- @CacheEvict
+ @CacheEvict(key = "#entity.processDefinitionCode")
int updateById(@Name("entity") @Param("et")Schedule entity);
}
diff --git
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionLogMapper.java
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionLogMapper.java
index 75e1b29..30ee855 100644
---
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionLogMapper.java
+++
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionLogMapper.java
@@ -25,6 +25,7 @@ import org.apache.ibatis.annotations.Param;
import java.util.Collection;
import java.util.List;
+import org.springframework.boot.context.properties.bind.Name;
import org.springframework.cache.annotation.CacheConfig;
import org.springframework.cache.annotation.CacheEvict;
import org.springframework.cache.annotation.Cacheable;
@@ -53,15 +54,15 @@ public interface TaskDefinitionLogMapper extends
BaseMapper<TaskDefinitionLog> {
* @param version version
* @return task definition log
*/
- @Cacheable(sync = true, key = "#taskCode + '_' + #taskDefinitionVersion")
+ @Cacheable(sync = true, key = "#code + '_' + #version")
TaskDefinitionLog queryByDefinitionCodeAndVersion(@Param("code") long code,
@Param("version") int
version);
/**
* update
*/
- @CacheEvict
- int updateById(@Param("et") TaskDefinitionLog taskDefinitionLog);
+ @CacheEvict(key = "#taskDefinitionLog.code + '_' +
#taskDefinitionLog.version")
+ int updateById(@Name("taskDefinitionLog") @Param("et") TaskDefinitionLog
taskDefinitionLog);
/**
* @param taskDefinitions taskDefinition list
@@ -84,8 +85,8 @@ public interface TaskDefinitionLogMapper extends
BaseMapper<TaskDefinitionLog> {
* @param version task definition version
* @return delete result
*/
- @CacheEvict
- int deleteByCodeAndVersion(@Param("code") long code, @Param("version") int
version);
+ @CacheEvict(key = "#code + '_' #version")
+ int deleteByCodeAndVersion(@Name("code") @Param("code") long code,
@Name("version") @Param("version") int version);
/**
* query the paging task definition version list by pagination info
diff --git
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TenantMapper.java
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TenantMapper.java
index 843122f..0def8b8 100644
---
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TenantMapper.java
+++
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TenantMapper.java
@@ -21,6 +21,7 @@ import org.apache.dolphinscheduler.dao.entity.Tenant;
import org.apache.ibatis.annotations.Param;
+import org.springframework.boot.context.properties.bind.Name;
import org.springframework.cache.annotation.CacheConfig;
import org.springframework.cache.annotation.CacheEvict;
import org.springframework.cache.annotation.Cacheable;
@@ -52,8 +53,8 @@ public interface TenantMapper extends BaseMapper<Tenant> {
/**
* update
*/
- @CacheEvict
- int updateById(@Param("et") Tenant tenant);
+ @CacheEvict(key = "#tenant.id")
+ int updateById(@Name("tenant") @Param("et") Tenant tenant);
/**
* query tenant by code
diff --git
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/UserMapper.java
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/UserMapper.java
index 1d5ad8f..460c954 100644
---
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/UserMapper.java
+++
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/UserMapper.java
@@ -23,6 +23,7 @@ import org.apache.ibatis.annotations.Param;
import java.util.List;
+import org.springframework.boot.context.properties.bind.Name;
import org.springframework.cache.annotation.CacheConfig;
import org.springframework.cache.annotation.CacheEvict;
import org.springframework.cache.annotation.Cacheable;
@@ -52,8 +53,8 @@ public interface UserMapper extends BaseMapper<User> {
/**
* update
*/
- @CacheEvict
- int updateById(@Param("et") User user);
+ @CacheEvict(key = "#user.id")
+ int updateById(@Name("user") @Param("et") User user);
/**
* query all general user
diff --git
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/WorkerGroupMapper.java
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/WorkerGroupMapper.java
index b932c0b..bfe01f8 100644
---
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/WorkerGroupMapper.java
+++
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/WorkerGroupMapper.java
@@ -37,22 +37,24 @@ public interface WorkerGroupMapper extends
BaseMapper<WorkerGroup> {
/**
* query all worker group
+ *
* @return worker group list
*/
- @Cacheable(sync = true, key = "'all'")
+ @Cacheable(sync = true, key = "all")
List<WorkerGroup> queryAllWorkerGroup();
- @CacheEvict
+ @CacheEvict(key = "all")
int deleteById(Integer id);
- @CacheEvict
+ @CacheEvict(key = "all")
int insert(WorkerGroup entity);
- @CacheEvict
+ @CacheEvict(key = "all")
int updateById(@Param("et") WorkerGroup entity);
/**
* query worer grouop by name
+ *
* @param name name
* @return worker group list
*/
diff --git
a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CacheExpireCommand.java
b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CacheExpireCommand.java
index 26170af..a32d4fc 100644
---
a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CacheExpireCommand.java
+++
b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CacheExpireCommand.java
@@ -28,29 +28,23 @@ import java.io.Serializable;
public class CacheExpireCommand implements Serializable {
private CacheType cacheType;
- private Class updateObjClass;
- private String updateObjJson;
+ private String cacheKey;
public CacheExpireCommand() {
super();
}
- public CacheExpireCommand(CacheType cacheType, Object updateObj) {
+ public CacheExpireCommand(CacheType cacheType, String cacheKey) {
this.cacheType = cacheType;
- this.updateObjClass = updateObj.getClass();
- this.updateObjJson = JSONUtils.toJsonString(updateObj);
+ this.cacheKey = cacheKey;
}
public CacheType getCacheType() {
return cacheType;
}
- public Class getUpdateObjClass() {
- return updateObjClass;
- }
-
- public String getUpdateObjJson() {
- return updateObjJson;
+ public String getCacheKey() {
+ return cacheKey;
}
/**
@@ -68,6 +62,6 @@ public class CacheExpireCommand implements Serializable {
@Override
public String toString() {
- return String.format("CacheExpireCommand{CacheType=%s,
updateObjClass=%s, updateObjJson=%s}", cacheType, updateObjClass,
updateObjJson);
+ return String.format("CacheExpireCommand{CacheType=%s, cacheKey=%s}",
cacheType, cacheKey);
}
}
diff --git
a/dolphinscheduler-remote/src/test/java/org/apache/dolphinscheduler/remote/command/cache/CacheExpireCommandTest.java
b/dolphinscheduler-remote/src/test/java/org/apache/dolphinscheduler/remote/command/cache/CacheExpireCommandTest.java
index 1c91d15..2351234 100644
---
a/dolphinscheduler-remote/src/test/java/org/apache/dolphinscheduler/remote/command/cache/CacheExpireCommandTest.java
+++
b/dolphinscheduler-remote/src/test/java/org/apache/dolphinscheduler/remote/command/cache/CacheExpireCommandTest.java
@@ -29,10 +29,7 @@ public class CacheExpireCommandTest {
@Test
public void testConvert2Command() {
- CacheExpireCommand cacheExpireCommand = new
CacheExpireCommand(CacheType.TENANT, 1);
- Assert.assertEquals(Integer.class,
cacheExpireCommand.getUpdateObjClass());
- Assert.assertEquals("1", cacheExpireCommand.getUpdateObjJson());
-
+ CacheExpireCommand cacheExpireCommand = new
CacheExpireCommand(CacheType.TENANT, "1");
Command command = cacheExpireCommand.convert2Command();
Assert.assertEquals(CommandType.CACHE_EXPIRE, command.getType());
}
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java
index d4a9958..88ecfbd 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java
@@ -39,7 +39,6 @@ public class MasterConfig {
private int stateWheelInterval;
private double maxCpuLoadAvg;
private double reservedMemory;
- private boolean cacheProcessDefinition;
public int getListenPort() {
return listenPort;
@@ -136,12 +135,4 @@ public class MasterConfig {
public void setReservedMemory(double reservedMemory) {
this.reservedMemory = reservedMemory;
}
-
- public boolean isCacheProcessDefinition() {
- return cacheProcessDefinition;
- }
-
- public void setCacheProcessDefinition(boolean cacheProcessDefinition) {
- this.cacheProcessDefinition = cacheProcessDefinition;
- }
}
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/CacheProcessor.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/CacheProcessor.java
index c66a57b..6db7f65 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/CacheProcessor.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/CacheProcessor.java
@@ -19,14 +19,6 @@ package org.apache.dolphinscheduler.server.master.processor;
import org.apache.dolphinscheduler.common.enums.CacheType;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
-import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
-import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelation;
-import org.apache.dolphinscheduler.dao.entity.Queue;
-import org.apache.dolphinscheduler.dao.entity.Schedule;
-import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
-import org.apache.dolphinscheduler.dao.entity.Tenant;
-import org.apache.dolphinscheduler.dao.entity.User;
-import org.apache.dolphinscheduler.dao.entity.WorkerGroup;
import org.apache.dolphinscheduler.remote.command.CacheExpireCommand;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType;
@@ -67,142 +59,15 @@ public class CacheProcessor implements
NettyRequestProcessor {
cacheManager =
SpringApplicationContext.getBean(CacheManager.class);
}
- Object object =
JSONUtils.parseObject(cacheExpireCommand.getUpdateObjJson(),
cacheExpireCommand.getUpdateObjClass());
- if (object == null) {
+ if (cacheExpireCommand.getCacheKey().isEmpty()) {
return;
}
CacheType cacheType = cacheExpireCommand.getCacheType();
- switch (cacheType) {
- case TENANT:
- if (object instanceof Tenant) {
- Tenant tenant = (Tenant) object;
- tenantCacheExpire(tenant);
- }
- break;
- case USER:
- if (object instanceof User) {
- User user = (User) object;
- userCacheExpire(user);
- }
- break;
- case QUEUE:
- if (object instanceof Queue) {
- Queue queue = (Queue) object;
- queueCacheExpire(queue);
- }
- break;
- case PROCESS_DEFINITION:
- if (object instanceof ProcessDefinition) {
- ProcessDefinition processDefinition = (ProcessDefinition)
object;
- processDefinitionCacheExpire(processDefinition);
- }
- break;
- case TASK_DEFINITION:
- if (object instanceof TaskDefinition) {
- TaskDefinition taskDefinition = (TaskDefinition) object;
- taskDefinitionCacheExpire(taskDefinition);
- }
- break;
- case PROCESS_TASK_RELATION:
- if (object instanceof ProcessTaskRelation) {
- ProcessTaskRelation processTaskRelation =
(ProcessTaskRelation) object;
- processTaskRelationCacheExpire(processTaskRelation);
- }
- break;
- case WORKER_GROUP:
- if (object instanceof WorkerGroup) {
- WorkerGroup workerGroup = (WorkerGroup) object;
- workerGroupCacheExpire(workerGroup);
- }
- break;
- case SCHEDULE:
- if (object instanceof Schedule) {
- Schedule schedule = (Schedule) object;
- scheduleCacheExpire(schedule);
- }
- break;
- default:
- logger.error("no support cache type:{}", cacheType);
- }
-
- // if delete operation, just send key
- if (object instanceof String) {
- Cache cache = cacheManager.getCache(cacheType.getCacheName());
- if (cache != null) {
- cache.evict(object);
- logger.info("cache evict, type:{}, key:{}",
cacheType.getCacheName(), object);
- }
- }
- }
-
- private void tenantCacheExpire(Tenant tenant) {
- Cache cache = cacheManager.getCache(CacheType.TENANT.getCacheName());
- if (cache != null) {
- cache.evict(tenant.getId());
- logger.info("cache evict, type:{}, key:{}",
CacheType.TENANT.getCacheName(), tenant.getId());
- }
- }
-
- private void userCacheExpire(User user) {
- Cache cache = cacheManager.getCache(CacheType.USER.getCacheName());
- if (cache != null) {
- cache.evict(user.getId());
- logger.info("cache evict, type:{}, key:{}",
CacheType.USER.getCacheName(), user.getId());
- }
- }
-
- private void queueCacheExpire(Queue queue) {
- Cache cache = cacheManager.getCache(CacheType.USER.getCacheName());
- if (cache != null) {
- cache.clear();
- logger.info("cache evict, type:{}, clear",
CacheType.USER.getCacheName());
- }
- }
-
- private void processDefinitionCacheExpire(ProcessDefinition
processDefinition) {
- Cache cache =
cacheManager.getCache(CacheType.PROCESS_DEFINITION.getCacheName());
- if (cache != null) {
- cache.evict(processDefinition.getCode());
- cache.evict(processDefinition.getCode() + "_" +
processDefinition.getVersion());
- logger.info("cache evict, type:{}, key:{}",
- CacheType.PROCESS_DEFINITION.getCacheName(),
processDefinition.getCode() + "_" + processDefinition.getVersion());
- }
- }
-
- private void processTaskRelationCacheExpire(ProcessTaskRelation
processTaskRelation) {
- Cache cache =
cacheManager.getCache(CacheType.PROCESS_TASK_RELATION.getCacheName());
- if (cache != null) {
- cache.evict(processTaskRelation.getProjectCode() + "_" +
processTaskRelation.getProcessDefinitionCode());
- logger.info("cache evict, type:{}, key:{}",
- CacheType.PROCESS_TASK_RELATION.getCacheName(),
processTaskRelation.getProjectCode() + "_" +
processTaskRelation.getProcessDefinitionCode());
- }
- }
-
- private void taskDefinitionCacheExpire(TaskDefinition taskDefinition) {
- Cache cache =
cacheManager.getCache(CacheType.TASK_DEFINITION.getCacheName());
- if (cache != null) {
- cache.evict(taskDefinition.getCode() + "_" +
taskDefinition.getVersion());
- logger.info("cache evict, type:{}, key:{}",
- CacheType.TASK_DEFINITION.getCacheName(),
taskDefinition.getCode() + "_" + taskDefinition.getVersion());
- }
- }
-
- private void workerGroupCacheExpire(WorkerGroup workerGroup) {
- Cache cache =
cacheManager.getCache(CacheType.WORKER_GROUP.getCacheName());
- if (cache != null) {
- cache.evict("all");
- logger.info("cache evict, type:{}, key:{}",
- CacheType.WORKER_GROUP.getCacheName(), "all");
- }
- }
-
- private void scheduleCacheExpire(Schedule schedule) {
- Cache cache = cacheManager.getCache(CacheType.SCHEDULE.getCacheName());
+ Cache cache = cacheManager.getCache(cacheType.getCacheName());
if (cache != null) {
- cache.evict(schedule.getProcessDefinitionCode());
- logger.info("cache evict, type:{}, key:{}",
- CacheType.SCHEDULE.getCacheName(),
schedule.getProcessDefinitionCode());
+ cache.evict(cacheExpireCommand.getCacheKey());
+ logger.info("cache evict, type:{}, key:{}",
cacheType.getCacheName(), cacheExpireCommand.getCacheKey());
}
}
}
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/EventExecuteService.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/EventExecuteService.java
index c40618a..3da043c 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/EventExecuteService.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/EventExecuteService.java
@@ -19,6 +19,7 @@ package org.apache.dolphinscheduler.server.master.runner;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
+import org.apache.dolphinscheduler.common.enums.Flag;
import org.apache.dolphinscheduler.common.enums.StateEvent;
import org.apache.dolphinscheduler.common.enums.StateEventType;
import org.apache.dolphinscheduler.common.thread.Stopper;
@@ -146,9 +147,11 @@ public class EventExecuteService extends Thread {
}
private void notifyProcessChanged() {
- Map<ProcessInstance, TaskInstance> fatherMaps
- =
processService.notifyProcessList(processInstanceId, 0);
+ if (Flag.NO ==
workflowExecuteThread.getProcessInstance().getIsSubProcess()) {
+ return;
+ }
+ Map<ProcessInstance, TaskInstance> fatherMaps =
processService.notifyProcessList(processInstanceId);
for (ProcessInstance processInstance :
fatherMaps.keySet()) {
String address =
NetUtils.getAddr(masterConfig.getListenPort());
if
(processInstance.getHost().equalsIgnoreCase(address)) {
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java
index 9116ce5..fe60ec6 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java
@@ -51,9 +51,11 @@ import org.apache.dolphinscheduler.dao.entity.Command;
import org.apache.dolphinscheduler.dao.entity.Environment;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
+import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelation;
import org.apache.dolphinscheduler.dao.entity.ProjectUser;
import org.apache.dolphinscheduler.dao.entity.Schedule;
import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
+import org.apache.dolphinscheduler.dao.entity.TaskDefinitionLog;
import org.apache.dolphinscheduler.dao.entity.TaskGroupQueue;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.utils.DagHelper;
@@ -84,6 +86,7 @@ import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -682,11 +685,10 @@ public class WorkflowExecuteThread implements Runnable {
ProjectUser projectUser =
processService.queryProjectWithUserByProcessInstanceId(processInstance.getId());
processAlertManager.sendAlertProcessInstance(processInstance,
getValidTaskList(), projectUser);
}
- List<TaskInstance> taskInstances =
processService.findValidTaskListByProcessId(processInstance.getId());
- ProjectUser projectUser =
processService.queryProjectWithUserByProcessInstanceId(processInstance.getId());
- processAlertManager.sendAlertProcessInstance(processInstance,
taskInstances, projectUser);
- //release task group
- processService.releaseAllTaskGroup(processInstance.getId());
+ if (checkTaskQueue()) {
+ //release task group
+ processService.releaseAllTaskGroup(processInstance.getId());
+ }
}
public void checkSerialProcess(ProcessDefinition processDefinition) {
@@ -725,8 +727,10 @@ public class WorkflowExecuteThread implements Runnable {
processInstance.setProcessDefinition(processDefinition);
List<TaskInstance> recoverNodeList =
getStartTaskInstanceList(processInstance.getCommandParam());
- List<TaskNode> taskNodeList =
-
processService.transformTask(processService.findRelationByCode(processDefinition.getProjectCode(),
processDefinition.getCode()), Lists.newArrayList());
+
+ List<ProcessTaskRelation> processTaskRelations =
processService.findRelationByCode(processDefinition.getProjectCode(),
processDefinition.getCode());
+ List<TaskDefinitionLog> taskDefinitionLogs =
processService.getTaskDefineLogListByRelation(processTaskRelations);
+ List<TaskNode> taskNodeList =
processService.transformTask(processTaskRelations, taskDefinitionLogs);
forbiddenTaskMap.clear();
taskNodeList.forEach(taskNode -> {
@@ -759,7 +763,7 @@ public class WorkflowExecuteThread implements Runnable {
completeTaskMap.clear();
errorTaskMap.clear();
- if (ExecutionStatus.SUBMITTED_SUCCESS != processInstance.getState()) {
+ if (!isNewProcessInstance()) {
List<TaskInstance> validTaskInstanceList =
processService.findValidTaskListByProcessId(processInstance.getId());
for (TaskInstance task : validTaskInstanceList) {
validTaskMap.put(Long.toString(task.getTaskCode()),
task.getId());
@@ -1625,4 +1629,28 @@ public class WorkflowExecuteThread implements Runnable {
TaskDependType depNodeType) throws
Exception {
return DagHelper.generateFlowDag(totalTaskNodeList, startNodeNameList,
recoveryNodeCodeList, depNodeType);
}
+
+ /**
+ * check task queue
+ */
+ private boolean checkTaskQueue() {
+ AtomicBoolean result = new AtomicBoolean(false);
+ taskInstanceMap.forEach((id, taskInstance) -> {
+ if (taskInstance != null && taskInstance.getTaskGroupId() > 0) {
+ result.set(true);
+ }
+ });
+ return result.get();
+ }
+
+ /**
+ * is new process instance
+ */
+ private boolean isNewProcessInstance() {
+ if (ExecutionStatus.RUNNING_EXECUTION == processInstance.getState() &&
processInstance.getRunTimes() == 1) {
+ return true;
+ } else {
+ return false;
+ }
+ }
}
diff --git a/dolphinscheduler-server/src/main/resources/application-master.yaml
b/dolphinscheduler-server/src/main/resources/application-master.yaml
index 866b200..12ed516 100644
--- a/dolphinscheduler-server/src/main/resources/application-master.yaml
+++ b/dolphinscheduler-server/src/main/resources/application-master.yaml
@@ -18,7 +18,7 @@ spring:
application:
name: master-server
cache:
- # default enable cache, you can disable by `type: none`
+ # default unable cache, you can disable by `type: caffeine`
type: none
cache-names:
- tenant
@@ -26,6 +26,8 @@ spring:
- processDefinition
- processTaskRelation
- taskDefinition
+ - workerGroup
+ - schedule
caffeine:
spec: maximumSize=100,expireAfterWrite=300s,recordStats
@@ -37,8 +39,6 @@ master:
pre-exec-threads: 10
# master execute thread number to limit process instances in parallel
exec-threads: 100
- # master execute task number in parallel per process instance
- exec-task-num: 20
# master dispatch task number per batch
dispatch-task-number: 3
# master host selector to select a suitable worker, default value:
LowerWeight. Optional values include random, round_robin, lower_weight
@@ -54,8 +54,6 @@ master:
max-cpu-load-avg: -1
# master reserved memory, only lower than system available memory, master
server can schedule. default value 0.3, the unit is G
reserved-memory: 0.3
- # master cache process definition, default: true
- cache-process-definition: true
server:
port: 5679
diff --git
a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/processor/CacheProcessorTest.java
b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/processor/CacheProcessorTest.java
index 6f1907c..5c177ca 100644
---
a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/processor/CacheProcessorTest.java
+++
b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/processor/CacheProcessorTest.java
@@ -66,7 +66,7 @@ public class CacheProcessorTest {
public void testProcess() {
Tenant tenant = new Tenant();
tenant.setId(1);
- CacheExpireCommand cacheExpireCommand = new
CacheExpireCommand(CacheType.TENANT, tenant);
+ CacheExpireCommand cacheExpireCommand = new
CacheExpireCommand(CacheType.TENANT, "1");
Command command = cacheExpireCommand.convert2Command();
cacheProcessor.process(channel, command);
diff --git
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
index c3ee5c8..e11a108 100644
---
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
+++
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
@@ -2418,6 +2418,19 @@ public class ProcessService {
return
taskDefinitionLogMapper.queryByTaskDefinitions(taskDefinitionSet);
}
+ public List<TaskDefinitionLog>
getTaskDefineLogListByRelation(List<ProcessTaskRelation> processTaskRelations) {
+ List<TaskDefinitionLog> taskDefinitionLogs =
com.google.common.collect.Lists.newArrayList();
+ for (ProcessTaskRelation processTaskRelation : processTaskRelations) {
+ if (processTaskRelation.getPreTaskCode() > 0) {
+ taskDefinitionLogs.add((TaskDefinitionLog)
this.findTaskDefinition(processTaskRelation.getPreTaskCode(),
processTaskRelation.getPreTaskVersion()));
+ }
+ if (processTaskRelation.getPostTaskCode() > 0) {
+ taskDefinitionLogs.add((TaskDefinitionLog)
this.findTaskDefinition(processTaskRelation.getPostTaskCode(),
processTaskRelation.getPostTaskVersion()));
+ }
+ }
+ return taskDefinitionLogs;
+ }
+
/**
* find task definition by code and version
*/
@@ -2499,7 +2512,7 @@ public class ProcessService {
return taskNodeList;
}
- public Map<ProcessInstance, TaskInstance> notifyProcessList(int processId,
int taskId) {
+ public Map<ProcessInstance, TaskInstance> notifyProcessList(int processId)
{
HashMap<ProcessInstance, TaskInstance> processTaskMap = new
HashMap<>();
//find sub tasks
ProcessInstanceMap processInstanceMap =
processInstanceMapMapper.queryBySubProcessId(processId);
diff --git
a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/cache/CacheNotifyServiceTest.java
b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/cache/CacheNotifyServiceTest.java
index 04d1f78..a3dafb6 100644
---
a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/cache/CacheNotifyServiceTest.java
+++
b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/cache/CacheNotifyServiceTest.java
@@ -61,7 +61,7 @@ public class CacheNotifyServiceTest {
public void testNotifyMaster() {
User user1 = new User();
user1.setId(100);
- Command cacheExpireCommand = new CacheExpireCommand(CacheType.USER,
user1).convert2Command();
+ Command cacheExpireCommand = new CacheExpireCommand(CacheType.USER,
"100").convert2Command();
NettyServerConfig serverConfig = new NettyServerConfig();