This is an automated email from the ASF dual-hosted git repository.

leonbao pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new a80a7b3  [Feature-6963][MasterServer] unified cache manager (#7187)
a80a7b3 is described below

commit a80a7b3fcf6154764ef82b3b6536c8ee54f90444
Author: wind <[email protected]>
AuthorDate: Tue Dec 7 15:55:22 2021 +0800

    [Feature-6963][MasterServer] unified cache manager (#7187)
    
    * unified cache
    
    * reduce db select
    
    * checkstyle
    
    Co-authored-by: caishunfeng <[email protected]>
---
 .../api/aspect/CacheEvictAspect.java               |  32 +++--
 .../dao/mapper/ProcessDefinitionMapper.java        |   9 +-
 .../dao/mapper/ProcessTaskRelationMapper.java      |  43 ++++---
 .../dao/mapper/ScheduleMapper.java                 |   2 +-
 .../dao/mapper/TaskDefinitionLogMapper.java        |  11 +-
 .../dolphinscheduler/dao/mapper/TenantMapper.java  |   5 +-
 .../dolphinscheduler/dao/mapper/UserMapper.java    |   5 +-
 .../dao/mapper/WorkerGroupMapper.java              |  10 +-
 .../remote/command/CacheExpireCommand.java         |  18 +--
 .../command/cache/CacheExpireCommandTest.java      |   5 +-
 .../server/master/config/MasterConfig.java         |   9 --
 .../server/master/processor/CacheProcessor.java    | 143 +--------------------
 .../server/master/runner/EventExecuteService.java  |   7 +-
 .../master/runner/WorkflowExecuteThread.java       |  44 +++++--
 .../src/main/resources/application-master.yaml     |   8 +-
 .../master/processor/CacheProcessorTest.java       |   2 +-
 .../service/process/ProcessService.java            |  15 ++-
 .../service/cache/CacheNotifyServiceTest.java      |   2 +-
 18 files changed, 138 insertions(+), 232 deletions(-)

diff --git 
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/aspect/CacheEvictAspect.java
 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/aspect/CacheEvictAspect.java
index 336be21..39d6975 100644
--- 
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/aspect/CacheEvictAspect.java
+++ 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/aspect/CacheEvictAspect.java
@@ -22,6 +22,8 @@ import 
org.apache.dolphinscheduler.remote.command.CacheExpireCommand;
 import org.apache.dolphinscheduler.service.cache.CacheNotifyService;
 import org.apache.dolphinscheduler.service.cache.impl.CacheKeyGenerator;
 
+import org.apache.commons.lang3.StringUtils;
+
 import java.lang.annotation.Annotation;
 import java.lang.reflect.Method;
 import java.util.ArrayList;
@@ -34,6 +36,8 @@ import org.aspectj.lang.annotation.Around;
 import org.aspectj.lang.annotation.Aspect;
 import org.aspectj.lang.annotation.Pointcut;
 import org.aspectj.lang.reflect.MethodSignature;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.boot.context.properties.bind.Name;
 import org.springframework.cache.annotation.CacheConfig;
@@ -50,7 +54,12 @@ import org.springframework.stereotype.Component;
 @Component
 public class CacheEvictAspect {
 
-    private static final String UPDATE_BY_ID = "updateById";
+    private static final Logger logger = 
LoggerFactory.getLogger(CacheEvictAspect.class);
+
+    /**
+     * symbol of spring el
+     */
+    private static final String EL_SYMBOL = "#";
 
     @Autowired
     private CacheKeyGenerator cacheKeyGenerator;
@@ -77,17 +86,18 @@ public class CacheEvictAspect {
 
         CacheType cacheType = getCacheType(cacheConfig, cacheEvict);
         if (cacheType != null) {
-            // todo use springEL is better
-            if (method.getName().equalsIgnoreCase(UPDATE_BY_ID) && args.length 
== 1) {
-                Object updateObj = args[0];
-                cacheNotifyService.notifyMaster(new 
CacheExpireCommand(cacheType, updateObj).convert2Command());
-            } else if (!cacheEvict.key().isEmpty()) {
-                List<Name> paramsList = getParamAnnotationsByType(method, 
Name.class);
-                String key = parseKey(cacheEvict.key(), 
paramsList.stream().map(o -> o.value()).collect(Collectors.toList()), 
Arrays.asList(args));
-                cacheNotifyService.notifyMaster(new 
CacheExpireCommand(cacheType, key).convert2Command());
+            String cacheKey;
+            if (cacheEvict.key().isEmpty()) {
+                cacheKey = (String) cacheKeyGenerator.generate(target, method, 
args);
             } else {
-                Object key = cacheKeyGenerator.generate(target, method, args);
-                cacheNotifyService.notifyMaster(new 
CacheExpireCommand(cacheType, key).convert2Command());
+                cacheKey = cacheEvict.key();
+                List<Name> paramsList = getParamAnnotationsByType(method, 
Name.class);
+                if (cacheEvict.key().contains(EL_SYMBOL)) {
+                    cacheKey = parseKey(cacheEvict.key(), 
paramsList.stream().map(o -> o.value()).collect(Collectors.toList()), 
Arrays.asList(args));
+                }
+            }
+            if (StringUtils.isNotEmpty(cacheKey)) {
+                cacheNotifyService.notifyMaster(new 
CacheExpireCommand(cacheType, cacheKey).convert2Command());
             }
         }
 
diff --git 
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionMapper.java
 
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionMapper.java
index ca287e2..954b55b 100644
--- 
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionMapper.java
+++ 
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionMapper.java
@@ -27,6 +27,7 @@ import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 
+import org.springframework.boot.context.properties.bind.Name;
 import org.springframework.cache.annotation.CacheConfig;
 import org.springframework.cache.annotation.CacheEvict;
 import org.springframework.cache.annotation.Cacheable;
@@ -52,8 +53,8 @@ public interface ProcessDefinitionMapper extends 
BaseMapper<ProcessDefinition> {
     /**
      * update
      */
-    @CacheEvict
-    int updateById(@Param("et") ProcessDefinition processDefinition);
+    @CacheEvict(key = "#processDefinition.code")
+    int updateById(@Name("processDefinition") @Param("et") ProcessDefinition 
processDefinition);
 
     /**
      * query process definition by code list
@@ -69,8 +70,8 @@ public interface ProcessDefinitionMapper extends 
BaseMapper<ProcessDefinition> {
      * @param code code
      * @return delete result
      */
-    @CacheEvict
-    int deleteByCode(@Param("code") long code);
+    @CacheEvict(key = "#code")
+    int deleteByCode(@Name("code") @Param("code") long code);
 
     /**
      * verify process definition by name
diff --git 
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationMapper.java
 
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationMapper.java
index 5b3ea75..010b8e4 100644
--- 
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationMapper.java
+++ 
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationMapper.java
@@ -25,6 +25,7 @@ import org.apache.ibatis.annotations.Param;
 import java.util.List;
 import java.util.Map;
 
+import org.springframework.boot.context.properties.bind.Name;
 import org.springframework.cache.annotation.CacheConfig;
 import org.springframework.cache.annotation.CacheEvict;
 import org.springframework.cache.annotation.Cacheable;
@@ -51,8 +52,8 @@ public interface ProcessTaskRelationMapper extends 
BaseMapper<ProcessTaskRelatio
     /**
      * update
      */
-    @CacheEvict
-    int updateById(@Param("et") ProcessTaskRelation processTaskRelation);
+    @CacheEvict(key = "#processTaskRelation.projectCode + '_' + 
#processTaskRelation.processDefinitionCode")
+    int updateById(@Name("processTaskRelation") @Param("et") 
ProcessTaskRelation processTaskRelation);
 
     /**
      * process task relation by taskCode
@@ -77,9 +78,9 @@ public interface ProcessTaskRelationMapper extends 
BaseMapper<ProcessTaskRelatio
      * @param processCode processCode
      * @return int
      */
-    @CacheEvict
-    int deleteByCode(@Param("projectCode") long projectCode,
-                     @Param("processCode") long processCode);
+    @CacheEvict(key = "#projectCode + '_' + #processCode")
+    int deleteByCode(@Name("projectCode") @Param("projectCode") long 
projectCode,
+                     @Name("processCode") @Param("processCode") long 
processCode);
 
     /**
      * batch insert process task relation
@@ -101,7 +102,7 @@ public interface ProcessTaskRelationMapper extends 
BaseMapper<ProcessTaskRelatio
      * query upstream process task relation by taskCode
      *
      * @param projectCode projectCode
-     * @param taskCode    taskCode
+     * @param taskCode taskCode
      * @return ProcessTaskRelation
      */
     List<ProcessTaskRelation> queryUpstreamByCode(@Param("projectCode") long 
projectCode, @Param("taskCode") long taskCode);
@@ -110,7 +111,7 @@ public interface ProcessTaskRelationMapper extends 
BaseMapper<ProcessTaskRelatio
      * query downstream process task relation by taskCode
      *
      * @param projectCode projectCode
-     * @param taskCode    taskCode
+     * @param taskCode taskCode
      * @return ProcessTaskRelation
      */
     List<ProcessTaskRelation> queryDownstreamByCode(@Param("projectCode") long 
projectCode, @Param("taskCode") long taskCode);
@@ -118,24 +119,24 @@ public interface ProcessTaskRelationMapper extends 
BaseMapper<ProcessTaskRelatio
     /**
      * query task relation by codes
      *
-     * @param projectCode  projectCode
-     * @param taskCode     taskCode
+     * @param projectCode projectCode
+     * @param taskCode taskCode
      * @param preTaskCodes preTaskCode list
      * @return ProcessTaskRelation
      */
-    List<ProcessTaskRelation> queryUpstreamByCodes(@Param("projectCode") long 
projectCode, @Param("taskCode") long taskCode,@Param("preTaskCodes") Long[] 
preTaskCodes);
+    List<ProcessTaskRelation> queryUpstreamByCodes(@Param("projectCode") long 
projectCode, @Param("taskCode") long taskCode, @Param("preTaskCodes") Long[] 
preTaskCodes);
 
     /**
      * count upstream by codes
      *
      * @param projectCode projectCode
-     * @param taskCode    taskCode
-     * @param processDefinitionCodes    processDefinitionCodes
+     * @param taskCode taskCode
+     * @param processDefinitionCodes processDefinitionCodes
      * @return upstream count list group by process definition code
      */
     List<Map<String, Long>> 
countUpstreamByCodeGroupByProcessDefinitionCode(@Param("projectCode") long 
projectCode,
-                                                                             
@Param("processDefinitionCodes") Long[] processDefinitionCodes,
-                                                                             
@Param("taskCode") long taskCode);
+                                                                            
@Param("processDefinitionCodes") Long[] processDefinitionCodes,
+                                                                            
@Param("taskCode") long taskCode);
 
     /**
      * batch update process task relation pre task
@@ -148,10 +149,10 @@ public interface ProcessTaskRelationMapper extends 
BaseMapper<ProcessTaskRelatio
     /**
      * query by code
      *
-     * @param projectCode           projectCode
+     * @param projectCode projectCode
      * @param processDefinitionCode processDefinitionCode
-     * @param preTaskCode           preTaskCode
-     * @param postTaskCode          postTaskCode
+     * @param preTaskCode preTaskCode
+     * @param postTaskCode postTaskCode
      * @return ProcessTaskRelation
      */
     List<ProcessTaskRelation> queryByCode(@Param("projectCode") long 
projectCode,
@@ -162,7 +163,7 @@ public interface ProcessTaskRelationMapper extends 
BaseMapper<ProcessTaskRelatio
     /**
      * delete process task relation
      *
-     * @param processTaskRelationLog  processTaskRelationLog
+     * @param processTaskRelationLog processTaskRelationLog
      * @return int
      */
     int deleteRelation(@Param("processTaskRelationLog") ProcessTaskRelationLog 
processTaskRelationLog);
@@ -170,10 +171,10 @@ public interface ProcessTaskRelationMapper extends 
BaseMapper<ProcessTaskRelatio
     /**
      * count by code
      *
-     * @param projectCode           projectCode
+     * @param projectCode projectCode
      * @param processDefinitionCode processDefinitionCode
-     * @param preTaskCode           preTaskCode
-     * @param postTaskCode          postTaskCode
+     * @param preTaskCode preTaskCode
+     * @param postTaskCode postTaskCode
      * @return ProcessTaskRelation
      */
     int countByCode(@Param("projectCode") long projectCode,
diff --git 
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ScheduleMapper.java
 
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ScheduleMapper.java
index 1dc7cff..52fabae 100644
--- 
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ScheduleMapper.java
+++ 
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ScheduleMapper.java
@@ -79,6 +79,6 @@ public interface ScheduleMapper extends BaseMapper<Schedule> {
     @CacheEvict(key = "#entity.processDefinitionCode")
     int insert(@Name("entity") Schedule entity);
 
-    @CacheEvict
+    @CacheEvict(key = "#entity.processDefinitionCode")
     int updateById(@Name("entity") @Param("et")Schedule entity);
 }
diff --git 
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionLogMapper.java
 
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionLogMapper.java
index 75e1b29..30ee855 100644
--- 
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionLogMapper.java
+++ 
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionLogMapper.java
@@ -25,6 +25,7 @@ import org.apache.ibatis.annotations.Param;
 import java.util.Collection;
 import java.util.List;
 
+import org.springframework.boot.context.properties.bind.Name;
 import org.springframework.cache.annotation.CacheConfig;
 import org.springframework.cache.annotation.CacheEvict;
 import org.springframework.cache.annotation.Cacheable;
@@ -53,15 +54,15 @@ public interface TaskDefinitionLogMapper extends 
BaseMapper<TaskDefinitionLog> {
      * @param version version
      * @return task definition log
      */
-    @Cacheable(sync = true, key = "#taskCode + '_' + #taskDefinitionVersion")
+    @Cacheable(sync = true, key = "#code + '_' + #version")
     TaskDefinitionLog queryByDefinitionCodeAndVersion(@Param("code") long code,
                                                       @Param("version") int 
version);
 
     /**
      * update
      */
-    @CacheEvict
-    int updateById(@Param("et") TaskDefinitionLog taskDefinitionLog);
+    @CacheEvict(key = "#taskDefinitionLog.code + '_' + 
#taskDefinitionLog.version")
+    int updateById(@Name("taskDefinitionLog") @Param("et") TaskDefinitionLog 
taskDefinitionLog);
 
     /**
      * @param taskDefinitions taskDefinition list
@@ -84,8 +85,8 @@ public interface TaskDefinitionLogMapper extends 
BaseMapper<TaskDefinitionLog> {
      * @param version task definition version
      * @return delete result
      */
-    @CacheEvict
-    int deleteByCodeAndVersion(@Param("code") long code, @Param("version") int 
version);
+    @CacheEvict(key = "#code + '_' #version")
+    int deleteByCodeAndVersion(@Name("code") @Param("code") long code, 
@Name("version") @Param("version") int version);
 
     /**
      * query the paging task definition version list by pagination info
diff --git 
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TenantMapper.java
 
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TenantMapper.java
index 843122f..0def8b8 100644
--- 
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TenantMapper.java
+++ 
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TenantMapper.java
@@ -21,6 +21,7 @@ import org.apache.dolphinscheduler.dao.entity.Tenant;
 
 import org.apache.ibatis.annotations.Param;
 
+import org.springframework.boot.context.properties.bind.Name;
 import org.springframework.cache.annotation.CacheConfig;
 import org.springframework.cache.annotation.CacheEvict;
 import org.springframework.cache.annotation.Cacheable;
@@ -52,8 +53,8 @@ public interface TenantMapper extends BaseMapper<Tenant> {
     /**
      * update
      */
-    @CacheEvict
-    int updateById(@Param("et") Tenant tenant);
+    @CacheEvict(key = "#tenant.id")
+    int updateById(@Name("tenant") @Param("et") Tenant tenant);
 
     /**
      * query tenant by code
diff --git 
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/UserMapper.java
 
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/UserMapper.java
index 1d5ad8f..460c954 100644
--- 
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/UserMapper.java
+++ 
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/UserMapper.java
@@ -23,6 +23,7 @@ import org.apache.ibatis.annotations.Param;
 
 import java.util.List;
 
+import org.springframework.boot.context.properties.bind.Name;
 import org.springframework.cache.annotation.CacheConfig;
 import org.springframework.cache.annotation.CacheEvict;
 import org.springframework.cache.annotation.Cacheable;
@@ -52,8 +53,8 @@ public interface UserMapper extends BaseMapper<User> {
     /**
      * update
      */
-    @CacheEvict
-    int updateById(@Param("et") User user);
+    @CacheEvict(key = "#user.id")
+    int updateById(@Name("user") @Param("et") User user);
 
     /**
      * query all general user
diff --git 
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/WorkerGroupMapper.java
 
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/WorkerGroupMapper.java
index b932c0b..bfe01f8 100644
--- 
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/WorkerGroupMapper.java
+++ 
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/WorkerGroupMapper.java
@@ -37,22 +37,24 @@ public interface WorkerGroupMapper extends 
BaseMapper<WorkerGroup> {
 
     /**
      * query all worker group
+     *
      * @return worker group list
      */
-    @Cacheable(sync = true, key = "'all'")
+    @Cacheable(sync = true, key = "all")
     List<WorkerGroup> queryAllWorkerGroup();
 
-    @CacheEvict
+    @CacheEvict(key = "all")
     int deleteById(Integer id);
 
-    @CacheEvict
+    @CacheEvict(key = "all")
     int insert(WorkerGroup entity);
 
-    @CacheEvict
+    @CacheEvict(key = "all")
     int updateById(@Param("et") WorkerGroup entity);
 
     /**
      * query worer grouop by name
+     *
      * @param name name
      * @return worker group list
      */
diff --git 
a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CacheExpireCommand.java
 
b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CacheExpireCommand.java
index 26170af..a32d4fc 100644
--- 
a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CacheExpireCommand.java
+++ 
b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CacheExpireCommand.java
@@ -28,29 +28,23 @@ import java.io.Serializable;
 public class CacheExpireCommand implements Serializable {
 
     private CacheType cacheType;
-    private Class updateObjClass;
-    private String updateObjJson;
+    private String cacheKey;
 
     public CacheExpireCommand() {
         super();
     }
 
-    public CacheExpireCommand(CacheType cacheType, Object updateObj) {
+    public CacheExpireCommand(CacheType cacheType, String cacheKey) {
         this.cacheType = cacheType;
-        this.updateObjClass = updateObj.getClass();
-        this.updateObjJson = JSONUtils.toJsonString(updateObj);
+        this.cacheKey = cacheKey;
     }
 
     public CacheType getCacheType() {
         return cacheType;
     }
 
-    public Class getUpdateObjClass() {
-        return updateObjClass;
-    }
-
-    public String getUpdateObjJson() {
-        return updateObjJson;
+    public String getCacheKey() {
+        return cacheKey;
     }
 
     /**
@@ -68,6 +62,6 @@ public class CacheExpireCommand implements Serializable {
 
     @Override
     public String toString() {
-        return String.format("CacheExpireCommand{CacheType=%s, 
updateObjClass=%s, updateObjJson=%s}", cacheType, updateObjClass, 
updateObjJson);
+        return String.format("CacheExpireCommand{CacheType=%s, cacheKey=%s}", 
cacheType, cacheKey);
     }
 }
diff --git 
a/dolphinscheduler-remote/src/test/java/org/apache/dolphinscheduler/remote/command/cache/CacheExpireCommandTest.java
 
b/dolphinscheduler-remote/src/test/java/org/apache/dolphinscheduler/remote/command/cache/CacheExpireCommandTest.java
index 1c91d15..2351234 100644
--- 
a/dolphinscheduler-remote/src/test/java/org/apache/dolphinscheduler/remote/command/cache/CacheExpireCommandTest.java
+++ 
b/dolphinscheduler-remote/src/test/java/org/apache/dolphinscheduler/remote/command/cache/CacheExpireCommandTest.java
@@ -29,10 +29,7 @@ public class CacheExpireCommandTest {
 
     @Test
     public void testConvert2Command() {
-        CacheExpireCommand cacheExpireCommand = new 
CacheExpireCommand(CacheType.TENANT, 1);
-        Assert.assertEquals(Integer.class, 
cacheExpireCommand.getUpdateObjClass());
-        Assert.assertEquals("1", cacheExpireCommand.getUpdateObjJson());
-
+        CacheExpireCommand cacheExpireCommand = new 
CacheExpireCommand(CacheType.TENANT, "1");
         Command command = cacheExpireCommand.convert2Command();
         Assert.assertEquals(CommandType.CACHE_EXPIRE, command.getType());
     }
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java
index d4a9958..88ecfbd 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java
@@ -39,7 +39,6 @@ public class MasterConfig {
     private int stateWheelInterval;
     private double maxCpuLoadAvg;
     private double reservedMemory;
-    private boolean cacheProcessDefinition;
 
     public int getListenPort() {
         return listenPort;
@@ -136,12 +135,4 @@ public class MasterConfig {
     public void setReservedMemory(double reservedMemory) {
         this.reservedMemory = reservedMemory;
     }
-
-    public boolean isCacheProcessDefinition() {
-        return cacheProcessDefinition;
-    }
-
-    public void setCacheProcessDefinition(boolean cacheProcessDefinition) {
-        this.cacheProcessDefinition = cacheProcessDefinition;
-    }
 }
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/CacheProcessor.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/CacheProcessor.java
index c66a57b..6db7f65 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/CacheProcessor.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/CacheProcessor.java
@@ -19,14 +19,6 @@ package org.apache.dolphinscheduler.server.master.processor;
 
 import org.apache.dolphinscheduler.common.enums.CacheType;
 import org.apache.dolphinscheduler.common.utils.JSONUtils;
-import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
-import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelation;
-import org.apache.dolphinscheduler.dao.entity.Queue;
-import org.apache.dolphinscheduler.dao.entity.Schedule;
-import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
-import org.apache.dolphinscheduler.dao.entity.Tenant;
-import org.apache.dolphinscheduler.dao.entity.User;
-import org.apache.dolphinscheduler.dao.entity.WorkerGroup;
 import org.apache.dolphinscheduler.remote.command.CacheExpireCommand;
 import org.apache.dolphinscheduler.remote.command.Command;
 import org.apache.dolphinscheduler.remote.command.CommandType;
@@ -67,142 +59,15 @@ public class CacheProcessor implements 
NettyRequestProcessor {
             cacheManager = 
SpringApplicationContext.getBean(CacheManager.class);
         }
 
-        Object object = 
JSONUtils.parseObject(cacheExpireCommand.getUpdateObjJson(), 
cacheExpireCommand.getUpdateObjClass());
-        if (object == null) {
+        if (cacheExpireCommand.getCacheKey().isEmpty()) {
             return;
         }
 
         CacheType cacheType = cacheExpireCommand.getCacheType();
-        switch (cacheType) {
-            case TENANT:
-                if (object instanceof Tenant) {
-                    Tenant tenant = (Tenant) object;
-                    tenantCacheExpire(tenant);
-                }
-                break;
-            case USER:
-                if (object instanceof User) {
-                    User user = (User) object;
-                    userCacheExpire(user);
-                }
-                break;
-            case QUEUE:
-                if (object instanceof Queue) {
-                    Queue queue = (Queue) object;
-                    queueCacheExpire(queue);
-                }
-                break;
-            case PROCESS_DEFINITION:
-                if (object instanceof ProcessDefinition) {
-                    ProcessDefinition processDefinition = (ProcessDefinition) 
object;
-                    processDefinitionCacheExpire(processDefinition);
-                }
-                break;
-            case TASK_DEFINITION:
-                if (object instanceof TaskDefinition) {
-                    TaskDefinition taskDefinition = (TaskDefinition) object;
-                    taskDefinitionCacheExpire(taskDefinition);
-                }
-                break;
-            case PROCESS_TASK_RELATION:
-                if (object instanceof ProcessTaskRelation) {
-                    ProcessTaskRelation processTaskRelation = 
(ProcessTaskRelation) object;
-                    processTaskRelationCacheExpire(processTaskRelation);
-                }
-                break;
-            case WORKER_GROUP:
-                if (object instanceof WorkerGroup) {
-                    WorkerGroup workerGroup = (WorkerGroup) object;
-                    workerGroupCacheExpire(workerGroup);
-                }
-                break;
-            case SCHEDULE:
-                if (object instanceof Schedule) {
-                    Schedule schedule = (Schedule) object;
-                    scheduleCacheExpire(schedule);
-                }
-                break;
-            default:
-                logger.error("no support cache type:{}", cacheType);
-        }
-
-        // if delete operation, just send key
-        if (object instanceof String) {
-            Cache cache = cacheManager.getCache(cacheType.getCacheName());
-            if (cache != null) {
-                cache.evict(object);
-                logger.info("cache evict, type:{}, key:{}", 
cacheType.getCacheName(), object);
-            }
-        }
-    }
-
-    private void tenantCacheExpire(Tenant tenant) {
-        Cache cache = cacheManager.getCache(CacheType.TENANT.getCacheName());
-        if (cache != null) {
-            cache.evict(tenant.getId());
-            logger.info("cache evict, type:{}, key:{}", 
CacheType.TENANT.getCacheName(), tenant.getId());
-        }
-    }
-
-    private void userCacheExpire(User user) {
-        Cache cache = cacheManager.getCache(CacheType.USER.getCacheName());
-        if (cache != null) {
-            cache.evict(user.getId());
-            logger.info("cache evict, type:{}, key:{}", 
CacheType.USER.getCacheName(), user.getId());
-        }
-    }
-
-    private void queueCacheExpire(Queue queue) {
-        Cache cache = cacheManager.getCache(CacheType.USER.getCacheName());
-        if (cache != null) {
-            cache.clear();
-            logger.info("cache evict, type:{}, clear", 
CacheType.USER.getCacheName());
-        }
-    }
-
-    private void processDefinitionCacheExpire(ProcessDefinition 
processDefinition) {
-        Cache cache = 
cacheManager.getCache(CacheType.PROCESS_DEFINITION.getCacheName());
-        if (cache != null) {
-            cache.evict(processDefinition.getCode());
-            cache.evict(processDefinition.getCode() + "_" + 
processDefinition.getVersion());
-            logger.info("cache evict, type:{}, key:{}",
-                    CacheType.PROCESS_DEFINITION.getCacheName(), 
processDefinition.getCode() + "_" + processDefinition.getVersion());
-        }
-    }
-
-    private void processTaskRelationCacheExpire(ProcessTaskRelation 
processTaskRelation) {
-        Cache cache = 
cacheManager.getCache(CacheType.PROCESS_TASK_RELATION.getCacheName());
-        if (cache != null) {
-            cache.evict(processTaskRelation.getProjectCode() + "_" + 
processTaskRelation.getProcessDefinitionCode());
-            logger.info("cache evict, type:{}, key:{}",
-                    CacheType.PROCESS_TASK_RELATION.getCacheName(), 
processTaskRelation.getProjectCode() + "_" + 
processTaskRelation.getProcessDefinitionCode());
-        }
-    }
-
-    private void taskDefinitionCacheExpire(TaskDefinition taskDefinition) {
-        Cache cache = 
cacheManager.getCache(CacheType.TASK_DEFINITION.getCacheName());
-        if (cache != null) {
-            cache.evict(taskDefinition.getCode() + "_" + 
taskDefinition.getVersion());
-            logger.info("cache evict, type:{}, key:{}",
-                    CacheType.TASK_DEFINITION.getCacheName(), 
taskDefinition.getCode() + "_" + taskDefinition.getVersion());
-        }
-    }
-
-    private void workerGroupCacheExpire(WorkerGroup workerGroup) {
-        Cache cache = 
cacheManager.getCache(CacheType.WORKER_GROUP.getCacheName());
-        if (cache != null) {
-            cache.evict("all");
-            logger.info("cache evict, type:{}, key:{}",
-                    CacheType.WORKER_GROUP.getCacheName(), "all");
-        }
-    }
-
-    private void scheduleCacheExpire(Schedule schedule) {
-        Cache cache = cacheManager.getCache(CacheType.SCHEDULE.getCacheName());
+        Cache cache = cacheManager.getCache(cacheType.getCacheName());
         if (cache != null) {
-            cache.evict(schedule.getProcessDefinitionCode());
-            logger.info("cache evict, type:{}, key:{}",
-                    CacheType.SCHEDULE.getCacheName(), 
schedule.getProcessDefinitionCode());
+            cache.evict(cacheExpireCommand.getCacheKey());
+            logger.info("cache evict, type:{}, key:{}", 
cacheType.getCacheName(), cacheExpireCommand.getCacheKey());
         }
     }
 }
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/EventExecuteService.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/EventExecuteService.java
index c40618a..3da043c 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/EventExecuteService.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/EventExecuteService.java
@@ -19,6 +19,7 @@ package org.apache.dolphinscheduler.server.master.runner;
 
 import org.apache.dolphinscheduler.common.Constants;
 import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
+import org.apache.dolphinscheduler.common.enums.Flag;
 import org.apache.dolphinscheduler.common.enums.StateEvent;
 import org.apache.dolphinscheduler.common.enums.StateEventType;
 import org.apache.dolphinscheduler.common.thread.Stopper;
@@ -146,9 +147,11 @@ public class EventExecuteService extends Thread {
                 }
 
                 private void notifyProcessChanged() {
-                    Map<ProcessInstance, TaskInstance> fatherMaps
-                            = 
processService.notifyProcessList(processInstanceId, 0);
+                    if (Flag.NO == 
workflowExecuteThread.getProcessInstance().getIsSubProcess()) {
+                        return;
+                    }
 
+                    Map<ProcessInstance, TaskInstance> fatherMaps = 
processService.notifyProcessList(processInstanceId);
                     for (ProcessInstance processInstance : 
fatherMaps.keySet()) {
                         String address = 
NetUtils.getAddr(masterConfig.getListenPort());
                         if 
(processInstance.getHost().equalsIgnoreCase(address)) {
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java
index 9116ce5..fe60ec6 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java
@@ -51,9 +51,11 @@ import org.apache.dolphinscheduler.dao.entity.Command;
 import org.apache.dolphinscheduler.dao.entity.Environment;
 import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
 import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
+import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelation;
 import org.apache.dolphinscheduler.dao.entity.ProjectUser;
 import org.apache.dolphinscheduler.dao.entity.Schedule;
 import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
+import org.apache.dolphinscheduler.dao.entity.TaskDefinitionLog;
 import org.apache.dolphinscheduler.dao.entity.TaskGroupQueue;
 import org.apache.dolphinscheduler.dao.entity.TaskInstance;
 import org.apache.dolphinscheduler.dao.utils.DagHelper;
@@ -84,6 +86,7 @@ import java.util.Objects;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -682,11 +685,10 @@ public class WorkflowExecuteThread implements Runnable {
             ProjectUser projectUser = 
processService.queryProjectWithUserByProcessInstanceId(processInstance.getId());
             processAlertManager.sendAlertProcessInstance(processInstance, 
getValidTaskList(), projectUser);
         }
-        List<TaskInstance> taskInstances = 
processService.findValidTaskListByProcessId(processInstance.getId());
-        ProjectUser projectUser = 
processService.queryProjectWithUserByProcessInstanceId(processInstance.getId());
-        processAlertManager.sendAlertProcessInstance(processInstance, 
taskInstances, projectUser);
-        //release task group
-        processService.releaseAllTaskGroup(processInstance.getId());
+        if (checkTaskQueue()) {
+            //release task group
+            processService.releaseAllTaskGroup(processInstance.getId());
+        }
     }
 
     public void checkSerialProcess(ProcessDefinition processDefinition) {
@@ -725,8 +727,10 @@ public class WorkflowExecuteThread implements Runnable {
         processInstance.setProcessDefinition(processDefinition);
 
         List<TaskInstance> recoverNodeList = 
getStartTaskInstanceList(processInstance.getCommandParam());
-        List<TaskNode> taskNodeList =
-                
processService.transformTask(processService.findRelationByCode(processDefinition.getProjectCode(),
 processDefinition.getCode()), Lists.newArrayList());
+
+        List<ProcessTaskRelation> processTaskRelations = 
processService.findRelationByCode(processDefinition.getProjectCode(), 
processDefinition.getCode());
+        List<TaskDefinitionLog> taskDefinitionLogs = 
processService.getTaskDefineLogListByRelation(processTaskRelations);
+        List<TaskNode> taskNodeList = 
processService.transformTask(processTaskRelations, taskDefinitionLogs);
         forbiddenTaskMap.clear();
 
         taskNodeList.forEach(taskNode -> {
@@ -759,7 +763,7 @@ public class WorkflowExecuteThread implements Runnable {
         completeTaskMap.clear();
         errorTaskMap.clear();
 
-        if (ExecutionStatus.SUBMITTED_SUCCESS != processInstance.getState()) {
+        if (!isNewProcessInstance()) {
             List<TaskInstance> validTaskInstanceList = 
processService.findValidTaskListByProcessId(processInstance.getId());
             for (TaskInstance task : validTaskInstanceList) {
                 validTaskMap.put(Long.toString(task.getTaskCode()), 
task.getId());
@@ -1625,4 +1629,28 @@ public class WorkflowExecuteThread implements Runnable {
                                       TaskDependType depNodeType) throws 
Exception {
         return DagHelper.generateFlowDag(totalTaskNodeList, startNodeNameList, 
recoveryNodeCodeList, depNodeType);
     }
+
+    /**
+     * check task queue
+     */
+    private boolean checkTaskQueue() {
+        AtomicBoolean result = new AtomicBoolean(false);
+        taskInstanceMap.forEach((id, taskInstance) -> {
+            if (taskInstance != null && taskInstance.getTaskGroupId() > 0) {
+                result.set(true);
+            }
+        });
+        return result.get();
+    }
+
+    /**
+     * is new process instance
+     */
+    private boolean isNewProcessInstance() {
+        if (ExecutionStatus.RUNNING_EXECUTION == processInstance.getState() && 
processInstance.getRunTimes() == 1) {
+            return true;
+        } else {
+            return false;
+        }
+    }
 }
diff --git a/dolphinscheduler-server/src/main/resources/application-master.yaml 
b/dolphinscheduler-server/src/main/resources/application-master.yaml
index 866b200..12ed516 100644
--- a/dolphinscheduler-server/src/main/resources/application-master.yaml
+++ b/dolphinscheduler-server/src/main/resources/application-master.yaml
@@ -18,7 +18,7 @@ spring:
   application:
     name: master-server
   cache:
-    # default enable cache, you can disable by `type: none`
+    # default unable cache, you can disable by `type: caffeine`
     type: none
     cache-names:
       - tenant
@@ -26,6 +26,8 @@ spring:
       - processDefinition
       - processTaskRelation
       - taskDefinition
+      - workerGroup
+      - schedule
     caffeine:
       spec: maximumSize=100,expireAfterWrite=300s,recordStats
 
@@ -37,8 +39,6 @@ master:
   pre-exec-threads: 10
   # master execute thread number to limit process instances in parallel
   exec-threads: 100
-  # master execute task number in parallel per process instance
-  exec-task-num: 20
   # master dispatch task number per batch
   dispatch-task-number: 3
   # master host selector to select a suitable worker, default value: 
LowerWeight. Optional values include random, round_robin, lower_weight
@@ -54,8 +54,6 @@ master:
   max-cpu-load-avg: -1
   # master reserved memory, only lower than system available memory, master 
server can schedule. default value 0.3, the unit is G
   reserved-memory: 0.3
-  # master cache process definition, default: true
-  cache-process-definition: true
 
 server:
   port: 5679
diff --git 
a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/processor/CacheProcessorTest.java
 
b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/processor/CacheProcessorTest.java
index 6f1907c..5c177ca 100644
--- 
a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/processor/CacheProcessorTest.java
+++ 
b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/processor/CacheProcessorTest.java
@@ -66,7 +66,7 @@ public class CacheProcessorTest {
     public void testProcess() {
         Tenant tenant = new Tenant();
         tenant.setId(1);
-        CacheExpireCommand cacheExpireCommand = new 
CacheExpireCommand(CacheType.TENANT, tenant);
+        CacheExpireCommand cacheExpireCommand = new 
CacheExpireCommand(CacheType.TENANT, "1");
         Command command = cacheExpireCommand.convert2Command();
 
         cacheProcessor.process(channel, command);
diff --git 
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
 
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
index c3ee5c8..e11a108 100644
--- 
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
+++ 
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
@@ -2418,6 +2418,19 @@ public class ProcessService {
         return 
taskDefinitionLogMapper.queryByTaskDefinitions(taskDefinitionSet);
     }
 
+    public List<TaskDefinitionLog> 
getTaskDefineLogListByRelation(List<ProcessTaskRelation> processTaskRelations) {
+        List<TaskDefinitionLog> taskDefinitionLogs = 
com.google.common.collect.Lists.newArrayList();
+        for (ProcessTaskRelation processTaskRelation : processTaskRelations) {
+            if (processTaskRelation.getPreTaskCode() > 0) {
+                taskDefinitionLogs.add((TaskDefinitionLog) 
this.findTaskDefinition(processTaskRelation.getPreTaskCode(), 
processTaskRelation.getPreTaskVersion()));
+            }
+            if (processTaskRelation.getPostTaskCode() > 0) {
+                taskDefinitionLogs.add((TaskDefinitionLog) 
this.findTaskDefinition(processTaskRelation.getPostTaskCode(), 
processTaskRelation.getPostTaskVersion()));
+            }
+        }
+        return taskDefinitionLogs;
+    }
+
     /**
      * find task definition by code and version
      */
@@ -2499,7 +2512,7 @@ public class ProcessService {
         return taskNodeList;
     }
 
-    public Map<ProcessInstance, TaskInstance> notifyProcessList(int processId, 
int taskId) {
+    public Map<ProcessInstance, TaskInstance> notifyProcessList(int processId) 
{
         HashMap<ProcessInstance, TaskInstance> processTaskMap = new 
HashMap<>();
         //find sub tasks
         ProcessInstanceMap processInstanceMap = 
processInstanceMapMapper.queryBySubProcessId(processId);
diff --git 
a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/cache/CacheNotifyServiceTest.java
 
b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/cache/CacheNotifyServiceTest.java
index 04d1f78..a3dafb6 100644
--- 
a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/cache/CacheNotifyServiceTest.java
+++ 
b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/cache/CacheNotifyServiceTest.java
@@ -61,7 +61,7 @@ public class CacheNotifyServiceTest {
     public void testNotifyMaster() {
         User user1 = new User();
         user1.setId(100);
-        Command cacheExpireCommand = new CacheExpireCommand(CacheType.USER, 
user1).convert2Command();
+        Command cacheExpireCommand = new CacheExpireCommand(CacheType.USER, 
"100").convert2Command();
 
         NettyServerConfig serverConfig = new NettyServerConfig();
 

Reply via email to