This is an automated email from the ASF dual-hosted git repository.

lidongdai pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new e083e28  [Improvement][service] Refactor service module to fix code 
smell (#4513)
e083e28 is described below

commit e083e28720a5cd5f4b2c1f8916c26a6089e7fa4e
Author: Segun Ogundipe <[email protected]>
AuthorDate: Fri Jan 29 12:44:33 2021 +0100

    [Improvement][service] Refactor service module to fix code smell (#4513)
    
    * chore: Refactor code to fix code smell in service module
    
    * chore: Add licence header to new files
    
    * chore: Fix comment from code review
    
    * chore[service]: Reduce the number of custom runtime exception to one
---
 .../dolphinscheduler/common/utils/StringUtils.java |   4 +
 .../service/bean/SpringApplicationContext.java     |   5 +-
 .../ServiceException.java}                         |  43 +-
 .../service/permission/PermissionCheck.java        |  48 +-
 .../service/process/ProcessService.java            | 307 +++++------
 .../service/quartz/ProcessScheduleJob.java         |  15 +-
 .../service/quartz/QuartzExecutors.java            | 614 +++++++++++----------
 .../service/quartz/cron/AbstractCycle.java         | 302 +++++-----
 .../service/zk/AbstractZKClient.java               | 611 ++++++++++----------
 .../service/zk/CuratorZookeeperClient.java         |  22 +-
 .../dolphinscheduler/service/zk/ZKServer.java      |  37 +-
 .../service/zk/ZookeeperCachedOperator.java        |  20 +-
 .../service/zk/ZookeeperOperator.java              |  45 +-
 13 files changed, 1071 insertions(+), 1002 deletions(-)

diff --git 
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/StringUtils.java
 
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/StringUtils.java
index 6e32d12..362c613 100644
--- 
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/StringUtils.java
+++ 
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/StringUtils.java
@@ -61,4 +61,8 @@ public class StringUtils {
     public static String trim(String str) {
         return str == null ? null : str.trim();
     }
+
+    public static boolean equalsIgnoreCase(String str1, String str2) {
+        return str1 == null ? str2 == null : str1.equalsIgnoreCase(str2);
+    }
 }
diff --git 
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/bean/SpringApplicationContext.java
 
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/bean/SpringApplicationContext.java
index ddf1fec..484b837 100644
--- 
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/bean/SpringApplicationContext.java
+++ 
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/bean/SpringApplicationContext.java
@@ -14,6 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.dolphinscheduler.service.bean;
 
 import org.springframework.beans.BeansException;
@@ -31,9 +32,7 @@ public class SpringApplicationContext implements 
ApplicationContextAware {
         SpringApplicationContext.applicationContext = applicationContext;
     }
 
-    public static <T> T getBean(Class<T> requiredType){
+    public static <T> T getBean(Class<T> requiredType) {
         return applicationContext.getBean(requiredType);
     }
-
-
 }
diff --git 
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/bean/SpringApplicationContext.java
 
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/exceptions/ServiceException.java
similarity index 51%
copy from 
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/bean/SpringApplicationContext.java
copy to 
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/exceptions/ServiceException.java
index ddf1fec..4465970 100644
--- 
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/bean/SpringApplicationContext.java
+++ 
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/exceptions/ServiceException.java
@@ -14,26 +14,39 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.dolphinscheduler.service.bean;
 
-import org.springframework.beans.BeansException;
-import org.springframework.context.ApplicationContext;
-import org.springframework.context.ApplicationContextAware;
-import org.springframework.stereotype.Component;
+package org.apache.dolphinscheduler.service.exceptions;
 
-@Component
-public class SpringApplicationContext implements ApplicationContextAware {
-
-    private static ApplicationContext applicationContext;
+/**
+ * Custom ZKServerException exception
+ */
+public class ServiceException extends RuntimeException {
 
-    @Override
-    public void setApplicationContext(ApplicationContext applicationContext) 
throws BeansException {
-        SpringApplicationContext.applicationContext = applicationContext;
+    /**
+     * Construct a new runtime exception with the error message
+     *
+     * @param errMsg Error message
+     */
+    public ServiceException(String errMsg) {
+        super(errMsg);
     }
 
-    public static <T> T getBean(Class<T> requiredType){
-        return applicationContext.getBean(requiredType);
+    /**
+     * Construct a new runtime exception with the cause
+     *
+     * @param cause cause
+     */
+    public ServiceException(Throwable cause) {
+        super(cause);
     }
 
-
+    /**
+     * Construct a new runtime exception with the detail message and cause
+     *
+     * @param errMsg message
+     * @param cause cause
+     */
+    public ServiceException(String errMsg, Throwable cause) {
+        super(errMsg, cause);
+    }
 }
diff --git 
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/permission/PermissionCheck.java
 
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/permission/PermissionCheck.java
index 1a9295b..a8f73f0 100644
--- 
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/permission/PermissionCheck.java
+++ 
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/permission/PermissionCheck.java
@@ -14,6 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.dolphinscheduler.service.permission;
 
 import org.apache.dolphinscheduler.common.enums.AuthorizationType;
@@ -21,11 +22,13 @@ import org.apache.dolphinscheduler.common.enums.UserType;
 import org.apache.dolphinscheduler.common.process.ResourceInfo;
 import org.apache.dolphinscheduler.common.utils.CollectionUtils;
 import org.apache.dolphinscheduler.dao.entity.User;
+import org.apache.dolphinscheduler.service.exceptions.ServiceException;
 import org.apache.dolphinscheduler.service.process.ProcessService;
-import org.slf4j.Logger;
 
 import java.util.List;
 
+import org.slf4j.Logger;
+
 public class PermissionCheck<T> {
     /**
      * logger
@@ -58,8 +61,9 @@ public class PermissionCheck<T> {
 
     /**
      * permission check
+     *
      * @param authorizationType authorization type
-     * @param processService        process dao
+     * @param processService process dao
      */
     public PermissionCheck(AuthorizationType authorizationType, ProcessService 
processService) {
         this.authorizationType = authorizationType;
@@ -68,10 +72,6 @@ public class PermissionCheck<T> {
 
     /**
      * permission check
-     * @param authorizationType
-     * @param processService
-     * @param needChecks
-     * @param userId
      */
     public PermissionCheck(AuthorizationType authorizationType, ProcessService 
processService, T[] needChecks, int userId) {
         this.authorizationType = authorizationType;
@@ -82,11 +82,6 @@ public class PermissionCheck<T> {
 
     /**
      * permission check
-     * @param authorizationType
-     * @param processService
-     * @param needChecks
-     * @param userId
-     * @param logger
      */
     public PermissionCheck(AuthorizationType authorizationType, ProcessService 
processService, T[] needChecks, int userId, Logger logger) {
         this.authorizationType = authorizationType;
@@ -98,13 +93,8 @@ public class PermissionCheck<T> {
 
     /**
      * permission check
-     * @param logger
-     * @param authorizationType
-     * @param processService
-     * @param resourceList
-     * @param userId
      */
-    public PermissionCheck(AuthorizationType authorizationType, ProcessService 
processService, List<ResourceInfo> resourceList, int userId,Logger logger) {
+    public PermissionCheck(AuthorizationType authorizationType, ProcessService 
processService, List<ResourceInfo> resourceList, int userId, Logger logger) {
         this.authorizationType = authorizationType;
         this.processService = processService;
         this.resourceList = resourceList;
@@ -154,9 +144,10 @@ public class PermissionCheck<T> {
 
     /**
      * has permission
+     *
      * @return true if has permission
      */
-    public boolean hasPermission(){
+    public boolean hasPermission() {
         try {
             checkPermission();
             return true;
@@ -167,23 +158,24 @@ public class PermissionCheck<T> {
 
     /**
      * check permission
-     * @throws Exception exception
+     *
+     * @throws ServiceException exception
      */
-    public void checkPermission() throws Exception{
-        if(this.needChecks.length > 0){
+    public void checkPermission() throws ServiceException {
+        if (this.needChecks.length > 0) {
 
             // get user type in order to judge whether the user is admin
             User user = processService.getUserById(userId);
             if (user == null) {
-                logger.error("user id {} didn't exist",userId);
-                throw new RuntimeException(String.format("user %s didn't 
exist",userId));
+                logger.error("user id {} doesn't exist", userId);
+                throw new ServiceException(String.format("user %s doesn't 
exist", userId));
             }
-            if (user.getUserType() != UserType.ADMIN_USER){
-                List<T> unauthorizedList = 
processService.listUnauthorized(userId,needChecks,authorizationType);
+            if (user.getUserType() != UserType.ADMIN_USER) {
+                List<T> unauthorizedList = 
processService.listUnauthorized(userId, needChecks, authorizationType);
                 // if exist unauthorized resource
-                if(CollectionUtils.isNotEmpty(unauthorizedList)){
-                    logger.error("user {} didn't has permission of {}: {}", 
user.getUserName(), authorizationType.getDescp(),unauthorizedList);
-                    throw new RuntimeException(String.format("user %s didn't 
has permission of %s %s", user.getUserName(), authorizationType.getDescp(), 
unauthorizedList.get(0)));
+                if (CollectionUtils.isNotEmpty(unauthorizedList)) {
+                    logger.error("user {} doesn't have permission of {}: {}", 
user.getUserName(), authorizationType.getDescp(), unauthorizedList);
+                    throw new ServiceException(String.format("user %s doesn't 
have permission of %s %s", user.getUserName(), authorizationType.getDescp(), 
unauthorizedList.get(0)));
                 }
             }
         }
diff --git 
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
 
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
index cfe649d..7c30509 100644
--- 
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
+++ 
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
@@ -84,6 +84,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Calendar;
 import java.util.Date;
+import java.util.EnumMap;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -110,11 +111,11 @@ public class ProcessService {
 
     private final Logger logger = LoggerFactory.getLogger(getClass());
 
-    private final int[] stateArray = new int[] 
{ExecutionStatus.SUBMITTED_SUCCESS.ordinal(),
-        ExecutionStatus.RUNNING_EXECUTION.ordinal(),
-        ExecutionStatus.DELAY_EXECUTION.ordinal(),
-        ExecutionStatus.READY_PAUSE.ordinal(),
-        ExecutionStatus.READY_STOP.ordinal()};
+    private final int[] stateArray = new 
int[]{ExecutionStatus.SUBMITTED_SUCCESS.ordinal(),
+            ExecutionStatus.RUNNING_EXECUTION.ordinal(),
+            ExecutionStatus.DELAY_EXECUTION.ordinal(),
+            ExecutionStatus.READY_PAUSE.ordinal(),
+            ExecutionStatus.READY_STOP.ordinal()};
 
     @Autowired
     private UserMapper userMapper;
@@ -158,16 +159,16 @@ public class ProcessService {
     /**
      * handle Command (construct ProcessInstance from Command) , wrapped in 
transaction
      *
-     * @param logger         logger
-     * @param host           host
+     * @param logger logger
+     * @param host host
      * @param validThreadNum validThreadNum
-     * @param command        found command
+     * @param command found command
      * @return process instance
      */
     @Transactional(rollbackFor = Exception.class)
     public ProcessInstance handleCommand(Logger logger, String host, int 
validThreadNum, Command command) {
         ProcessInstance processInstance = constructProcessInstance(command, 
host);
-        //cannot construct process instance, return null;
+        // cannot construct process instance, return null
         if (processInstance == null) {
             logger.error("scan command, command parameter is error: {}", 
command);
             moveToErrorCommand(command, "process instance is null");
@@ -201,7 +202,7 @@ public class ProcessService {
     /**
      * set process waiting thread
      *
-     * @param command         command
+     * @param command command
      * @param processInstance processInstance
      * @return process instance
      */
@@ -219,7 +220,7 @@ public class ProcessService {
     /**
      * check thread num
      *
-     * @param command        command
+     * @param command command
      * @param validThreadNum validThreadNum
      * @return if thread is enough
      */
@@ -259,7 +260,7 @@ public class ProcessService {
      */
     public Boolean verifyIsNeedCreateCommand(Command command) {
         Boolean isNeedCreate = true;
-        Map<CommandType, Integer> cmdTypeMap = new HashMap<CommandType, 
Integer>();
+        EnumMap<CommandType, Integer> cmdTypeMap = new 
EnumMap<>(CommandType.class);
         cmdTypeMap.put(CommandType.REPEAT_RUNNING, 1);
         cmdTypeMap.put(CommandType.RECOVER_SUSPENDED_PROCESS, 1);
         cmdTypeMap.put(CommandType.START_FAILURE_TASK_PROCESS, 1);
@@ -296,9 +297,6 @@ public class ProcessService {
 
     /**
      * get task node list by definitionId
-     *
-     * @param defineId
-     * @return
      */
     public List<TaskNode> getTaskNodeListByDefinitionId(Integer defineId) {
         ProcessDefinition processDefinition = 
processDefineMapper.selectById(defineId);
@@ -425,7 +423,7 @@ public class ProcessService {
      * recursive query sub process definition id by parent id.
      *
      * @param parentId parentId
-     * @param ids      ids
+     * @param ids ids
      */
     public void recurseFindSubProcessId(int parentId, List<Integer> ids) {
         ProcessDefinition processDefinition = 
processDefineMapper.selectById(parentId);
@@ -435,7 +433,7 @@ public class ProcessService {
 
         List<TaskNode> taskNodeList = processData.getTasks();
 
-        if (taskNodeList != null && taskNodeList.size() > 0) {
+        if (taskNodeList != null && !taskNodeList.isEmpty()) {
 
             for (TaskNode taskNode : taskNodeList) {
                 String parameter = taskNode.getParams();
@@ -456,7 +454,7 @@ public class ProcessService {
      * create recovery waiting thread  command and delete origin command at 
the same time.
      * if the recovery command is exists, only update the field update_time
      *
-     * @param originCommand   originCommand
+     * @param originCommand originCommand
      * @param processInstance processInstance
      */
     public void createRecoveryWaitingThreadCommand(Command originCommand, 
ProcessInstance processInstance) {
@@ -473,17 +471,17 @@ public class ProcessService {
         // process instance quit by "waiting thread" state
         if (originCommand == null) {
             Command command = new Command(
-                CommandType.RECOVER_WAITTING_THREAD,
-                processInstance.getTaskDependType(),
-                processInstance.getFailureStrategy(),
-                processInstance.getExecutorId(),
-                processInstance.getProcessDefinitionId(),
-                JSONUtils.toJsonString(cmdParam),
-                processInstance.getWarningType(),
-                processInstance.getWarningGroupId(),
-                processInstance.getScheduleTime(),
-                processInstance.getWorkerGroup(),
-                processInstance.getProcessInstancePriority()
+                    CommandType.RECOVER_WAITTING_THREAD,
+                    processInstance.getTaskDependType(),
+                    processInstance.getFailureStrategy(),
+                    processInstance.getExecutorId(),
+                    processInstance.getProcessDefinitionId(),
+                    JSONUtils.toJsonString(cmdParam),
+                    processInstance.getWarningType(),
+                    processInstance.getWarningGroupId(),
+                    processInstance.getScheduleTime(),
+                    processInstance.getWorkerGroup(),
+                    processInstance.getProcessInstancePriority()
             );
             saveCommand(command);
             return;
@@ -508,16 +506,14 @@ public class ProcessService {
     /**
      * get schedule time from command
      *
-     * @param command  command
+     * @param command command
      * @param cmdParam cmdParam map
      * @return date
      */
     private Date getScheduleTime(Command command, Map<String, String> 
cmdParam) {
         Date scheduleTime = command.getScheduleTime();
-        if (scheduleTime == null) {
-            if (cmdParam != null && 
cmdParam.containsKey(CMDPARAM_COMPLEMENT_DATA_START_DATE)) {
-                scheduleTime = 
DateUtils.stringToDate(cmdParam.get(CMDPARAM_COMPLEMENT_DATA_START_DATE));
-            }
+        if (scheduleTime == null && cmdParam != null && 
cmdParam.containsKey(CMDPARAM_COMPLEMENT_DATA_START_DATE)) {
+            scheduleTime = 
DateUtils.stringToDate(cmdParam.get(CMDPARAM_COMPLEMENT_DATA_START_DATE));
         }
         return scheduleTime;
     }
@@ -526,8 +522,8 @@ public class ProcessService {
      * generate a new work process instance from command.
      *
      * @param processDefinition processDefinition
-     * @param command           command
-     * @param cmdParam          cmdParam map
+     * @param command command
+     * @param cmdParam cmdParam map
      * @return process instance
      */
     private ProcessInstance generateNewProcessInstance(ProcessDefinition 
processDefinition,
@@ -580,10 +576,10 @@ public class ProcessService {
 
         // curing global params
         processInstance.setGlobalParams(ParameterUtils.curingGlobalParams(
-            processDefinition.getGlobalParamMap(),
-            processDefinition.getGlobalParamList(),
-            getCommandTypeIfComplement(processInstance, command),
-            processInstance.getScheduleTime()));
+                processDefinition.getGlobalParamMap(),
+                processDefinition.getGlobalParamList(),
+                getCommandTypeIfComplement(processInstance, command),
+                processInstance.getScheduleTime()));
 
         //copy process define json to process instance
         
processInstance.setProcessInstanceJson(processDefinition.getProcessDefinitionJson());
@@ -603,7 +599,7 @@ public class ProcessService {
      * use definition creator's tenant.
      *
      * @param tenantId tenantId
-     * @param userId   userId
+     * @param userId userId
      * @return tenant
      */
     public Tenant getTenantForProcess(int tenantId, int userId) {
@@ -626,15 +622,15 @@ public class ProcessService {
     /**
      * check command parameters is valid
      *
-     * @param command  command
+     * @param command command
      * @param cmdParam cmdParam map
      * @return whether command param is valid
      */
     private Boolean checkCmdParam(Command command, Map<String, String> 
cmdParam) {
         if (command.getTaskDependType() == TaskDependType.TASK_ONLY || 
command.getTaskDependType() == TaskDependType.TASK_PRE) {
             if (cmdParam == null
-                || !cmdParam.containsKey(Constants.CMD_PARAM_START_NODE_NAMES)
-                || 
cmdParam.get(Constants.CMD_PARAM_START_NODE_NAMES).isEmpty()) {
+                    || 
!cmdParam.containsKey(Constants.CMD_PARAM_START_NODE_NAMES)
+                    || 
cmdParam.get(Constants.CMD_PARAM_START_NODE_NAMES).isEmpty()) {
                 logger.error("command node depend type is {}, but start nodes 
is null ", command.getTaskDependType());
                 return false;
             }
@@ -646,7 +642,7 @@ public class ProcessService {
      * construct process instance according to one command.
      *
      * @param command command
-     * @param host    host
+     * @param host host
      * @return process instance
      */
     private ProcessInstance constructProcessInstance(Command command, String 
host) {
@@ -714,7 +710,7 @@ public class ProcessService {
             // generate one new process instance
             processInstance = generateNewProcessInstance(processDefinition, 
command, cmdParam);
         }
-        if (!checkCmdParam(command, cmdParam)) {
+        if (Boolean.FALSE.equals(checkCmdParam(command, cmdParam))) {
             logger.error("command parameter check failed!");
             return null;
         }
@@ -742,7 +738,7 @@ public class ProcessService {
                     initTaskInstance(this.findTaskInstanceById(taskId));
                 }
                 cmdParam.put(Constants.CMD_PARAM_RECOVERY_START_NODE_STRING,
-                    String.join(Constants.COMMA, 
convertIntListToString(failedList)));
+                        String.join(Constants.COMMA, 
convertIntListToString(failedList)));
                 
processInstance.setCommandParam(JSONUtils.toJsonString(cmdParam));
                 processInstance.setRunTimes(runTime + 1);
                 break;
@@ -755,7 +751,7 @@ public class ProcessService {
                 
cmdParam.remove(Constants.CMD_PARAM_RECOVERY_START_NODE_STRING);
                 List<Integer> suspendedNodeList = 
this.findTaskIdByInstanceState(processInstance.getId(), ExecutionStatus.PAUSE);
                 List<Integer> stopNodeList = 
findTaskIdByInstanceState(processInstance.getId(),
-                    ExecutionStatus.KILL);
+                        ExecutionStatus.KILL);
                 suspendedNodeList.addAll(stopNodeList);
                 for (Integer taskId : suspendedNodeList) {
                     // initialize the pause state
@@ -809,7 +805,7 @@ public class ProcessService {
      * return complement data if the process start with complement data
      *
      * @param processInstance processInstance
-     * @param command         command
+     * @param command command
      * @return command type
      */
     private CommandType getCommandTypeIfComplement(ProcessInstance 
processInstance, Command command) {
@@ -824,8 +820,8 @@ public class ProcessService {
      * initialize complement data parameters
      *
      * @param processDefinition processDefinition
-     * @param processInstance   processInstance
-     * @param cmdParam          cmdParam
+     * @param processInstance processInstance
+     * @param cmdParam cmdParam
      */
     private void initComplementDataParam(ProcessDefinition processDefinition,
                                          ProcessInstance processInstance,
@@ -835,14 +831,14 @@ public class ProcessService {
         }
 
         Date startComplementTime = 
DateUtils.parse(cmdParam.get(CMDPARAM_COMPLEMENT_DATA_START_DATE),
-            YYYY_MM_DD_HH_MM_SS);
+                YYYY_MM_DD_HH_MM_SS);
         if (Flag.NO == processInstance.getIsSubProcess()) {
             processInstance.setScheduleTime(startComplementTime);
         }
         processInstance.setGlobalParams(ParameterUtils.curingGlobalParams(
-            processDefinition.getGlobalParamMap(),
-            processDefinition.getGlobalParamList(),
-            CommandType.COMPLEMENT_DATA, processInstance.getScheduleTime()));
+                processDefinition.getGlobalParamMap(),
+                processDefinition.getGlobalParamList(),
+                CommandType.COMPLEMENT_DATA, 
processInstance.getScheduleTime()));
 
     }
 
@@ -862,7 +858,7 @@ public class ProcessService {
         Map<String, String> paramMap = JSONUtils.toMap(cmdParam);
         // write sub process id into cmd param.
         if (paramMap.containsKey(CMD_PARAM_SUB_PROCESS)
-            && 
CMD_PARAM_EMPTY_SUB_PROCESS.equals(paramMap.get(CMD_PARAM_SUB_PROCESS))) {
+                && 
CMD_PARAM_EMPTY_SUB_PROCESS.equals(paramMap.get(CMD_PARAM_SUB_PROCESS))) {
             paramMap.remove(CMD_PARAM_SUB_PROCESS);
             paramMap.put(CMD_PARAM_SUB_PROCESS, 
String.valueOf(subProcessInstance.getId()));
             
subProcessInstance.setCommandParam(JSONUtils.toJsonString(paramMap));
@@ -875,7 +871,7 @@ public class ProcessService {
             ProcessInstance parentInstance = 
findProcessInstanceDetailById(Integer.parseInt(parentInstanceId));
             if (parentInstance != null) {
                 subProcessInstance.setGlobalParams(
-                    joinGlobalParams(parentInstance.getGlobalParams(), 
subProcessInstance.getGlobalParams()));
+                        joinGlobalParams(parentInstance.getGlobalParams(), 
subProcessInstance.getGlobalParams()));
                 this.saveProcessInstance(subProcessInstance);
             } else {
                 logger.error("sub process command params error, cannot find 
parent instance: {} ", cmdParam);
@@ -897,7 +893,7 @@ public class ProcessService {
      * only the keys doesn't in sub process global would be joined.
      *
      * @param parentGlobalParams parentGlobalParams
-     * @param subGlobalParams    subGlobalParams
+     * @param subGlobalParams subGlobalParams
      * @return global params join
      */
     private String joinGlobalParams(String parentGlobalParams, String 
subGlobalParams) {
@@ -922,12 +918,11 @@ public class ProcessService {
      */
     private void initTaskInstance(TaskInstance taskInstance) {
 
-        if (!taskInstance.isSubProcess()) {
-            if (taskInstance.getState().typeIsCancel() || 
taskInstance.getState().typeIsFailure()) {
-                taskInstance.setFlag(Flag.NO);
-                updateTaskInstance(taskInstance);
-                return;
-            }
+        if (!taskInstance.isSubProcess()
+                && (taskInstance.getState().typeIsCancel() || 
taskInstance.getState().typeIsFailure())) {
+            taskInstance.setFlag(Flag.NO);
+            updateTaskInstance(taskInstance);
+            return;
         }
         taskInstance.setState(ExecutionStatus.SUBMITTED_SUCCESS);
         updateTaskInstance(taskInstance);
@@ -944,12 +939,12 @@ public class ProcessService {
     public TaskInstance submitTask(TaskInstance taskInstance) {
         ProcessInstance processInstance = 
this.findProcessInstanceDetailById(taskInstance.getProcessInstanceId());
         logger.info("start submit task : {}, instance id:{}, state: {}",
-            taskInstance.getName(), taskInstance.getProcessInstanceId(), 
processInstance.getState());
+                taskInstance.getName(), taskInstance.getProcessInstanceId(), 
processInstance.getState());
         //submit to db
         TaskInstance task = submitTaskInstanceToDB(taskInstance, 
processInstance);
         if (task == null) {
             logger.error("end submit task to db error, task name:{}, process 
id:{} state: {} ",
-                taskInstance.getName(), taskInstance.getProcessInstance(), 
processInstance.getState());
+                    taskInstance.getName(), taskInstance.getProcessInstance(), 
processInstance.getState());
             return task;
         }
         if (!task.getState().typeIsFinished()) {
@@ -957,7 +952,7 @@ public class ProcessService {
         }
 
         logger.info("end submit task to db successfully:{} state:{} complete, 
instance id:{} state: {}  ",
-            taskInstance.getName(), task.getState(), processInstance.getId(), 
processInstance.getState());
+                taskInstance.getName(), task.getState(), 
processInstance.getId(), processInstance.getState());
         return task;
     }
 
@@ -968,7 +963,7 @@ public class ProcessService {
      * set map {parent instance id, task instance id, 0(child instance id)}
      *
      * @param parentInstance parentInstance
-     * @param parentTask     parentTask
+     * @param parentTask parentTask
      * @return process instance map
      */
     private ProcessInstanceMap setProcessInstanceMap(ProcessInstance 
parentInstance, TaskInstance parentTask) {
@@ -997,7 +992,7 @@ public class ProcessService {
      * find previous task work process map.
      *
      * @param parentProcessInstance parentProcessInstance
-     * @param parentTask            parentTask
+     * @param parentTask parentTask
      * @return process instance map
      */
     private ProcessInstanceMap findPreviousTaskProcessMap(ProcessInstance 
parentProcessInstance,
@@ -1015,7 +1010,7 @@ public class ProcessService {
             }
         }
         logger.info("sub process instance is not found,parent task:{},parent 
instance:{}",
-            parentTask.getId(), parentProcessInstance.getId());
+                parentTask.getId(), parentProcessInstance.getId());
         return null;
     }
 
@@ -1049,10 +1044,6 @@ public class ProcessService {
 
     /**
      * complement data needs transform parent parameter to child.
-     *
-     * @param instanceMap
-     * @param parentProcessInstance
-     * @return
      */
     private String getSubWorkFlowParam(ProcessInstanceMap instanceMap, 
ProcessInstance parentProcessInstance) {
         // set sub work process command
@@ -1071,11 +1062,6 @@ public class ProcessService {
 
     /**
      * create sub work process command
-     *
-     * @param parentProcessInstance
-     * @param childInstance
-     * @param instanceMap
-     * @param task
      */
     public Command createSubProcessCommand(ProcessInstance 
parentProcessInstance,
                                            ProcessInstance childInstance,
@@ -1088,25 +1074,23 @@ public class ProcessService {
         String processParam = getSubWorkFlowParam(instanceMap, 
parentProcessInstance);
 
         return new Command(
-            commandType,
-            TaskDependType.TASK_POST,
-            parentProcessInstance.getFailureStrategy(),
-            parentProcessInstance.getExecutorId(),
-            childDefineId,
-            processParam,
-            parentProcessInstance.getWarningType(),
-            parentProcessInstance.getWarningGroupId(),
-            parentProcessInstance.getScheduleTime(),
-            task.getWorkerGroup(),
-            parentProcessInstance.getProcessInstancePriority()
+                commandType,
+                TaskDependType.TASK_POST,
+                parentProcessInstance.getFailureStrategy(),
+                parentProcessInstance.getExecutorId(),
+                childDefineId,
+                processParam,
+                parentProcessInstance.getWarningType(),
+                parentProcessInstance.getWarningGroupId(),
+                parentProcessInstance.getScheduleTime(),
+                task.getWorkerGroup(),
+                parentProcessInstance.getProcessInstancePriority()
         );
     }
 
     /**
      * initialize sub work flow state
      * child instance state would be initialized when 'recovery from 
pause/stop/failure'
-     *
-     * @param childInstance
      */
     private void initSubInstanceState(ProcessInstance childInstance) {
         if (childInstance != null) {
@@ -1119,9 +1103,6 @@ public class ProcessService {
      * get sub work flow command type
      * child instance exist: child command = fatherCommand
      * child instance not exists: child command = fatherCommand[0]
-     *
-     * @param parentProcessInstance
-     * @return
      */
     private CommandType getSubCommandType(ProcessInstance 
parentProcessInstance, ProcessInstance childInstance) {
         CommandType commandType = parentProcessInstance.getCommandType();
@@ -1136,7 +1117,7 @@ public class ProcessService {
      * update sub process definition
      *
      * @param parentProcessInstance parentProcessInstance
-     * @param childDefinitionId     childDefinitionId
+     * @param childDefinitionId childDefinitionId
      */
     private void updateSubProcessDefinitionByParent(ProcessInstance 
parentProcessInstance, int childDefinitionId) {
         ProcessDefinition fatherDefinition = 
this.findProcessDefineById(parentProcessInstance.getProcessDefinitionId());
@@ -1150,7 +1131,7 @@ public class ProcessService {
     /**
      * submit task to mysql
      *
-     * @param taskInstance    taskInstance
+     * @param taskInstance taskInstance
      * @param processInstance processInstance
      * @return task instance
      */
@@ -1163,7 +1144,7 @@ public class ProcessService {
             } else {
 
                 if (processInstanceState != ExecutionStatus.READY_STOP
-                    && processInstanceState != ExecutionStatus.READY_PAUSE) {
+                        && processInstanceState != 
ExecutionStatus.READY_PAUSE) {
                     // failure task set invalid
                     taskInstance.setFlag(Flag.NO);
                     updateTaskInstance(taskInstance);
@@ -1204,19 +1185,19 @@ public class ProcessService {
      * return stop if work process state is ready stop
      * if all of above are not satisfied, return submit success
      *
-     * @param taskInstance         taskInstance
+     * @param taskInstance taskInstance
      * @param processInstanceState processInstanceState
      * @return process instance state
      */
     public ExecutionStatus getSubmitTaskState(TaskInstance taskInstance, 
ExecutionStatus processInstanceState) {
         ExecutionStatus state = taskInstance.getState();
         if (
-            // running, delayed or killed
-            // the task already exists in task queue
-            // return state
-            state == ExecutionStatus.RUNNING_EXECUTION
-                || state == ExecutionStatus.DELAY_EXECUTION
-                || state == ExecutionStatus.KILL
+                // running, delayed or killed
+                // the task already exists in task queue
+                // return state
+                state == ExecutionStatus.RUNNING_EXECUTION
+                        || state == ExecutionStatus.DELAY_EXECUTION
+                        || state == ExecutionStatus.KILL
         ) {
             return state;
         }
@@ -1225,7 +1206,7 @@ public class ProcessService {
         if (processInstanceState == ExecutionStatus.READY_PAUSE) {
             state = ExecutionStatus.PAUSE;
         } else if (processInstanceState == ExecutionStatus.READY_STOP
-            || !checkProcessStrategy(taskInstance)) {
+                || !checkProcessStrategy(taskInstance)) {
             state = ExecutionStatus.KILL;
         } else {
             state = ExecutionStatus.SUBMITTED_SUCCESS;
@@ -1380,7 +1361,7 @@ public class ProcessService {
      * get id list by task state
      *
      * @param instanceId instanceId
-     * @param state      state
+     * @param state state
      * @return task instance states
      */
     public List<Integer> findTaskIdByInstanceState(int instanceId, 
ExecutionStatus state) {
@@ -1435,7 +1416,7 @@ public class ProcessService {
      * find work process map by parent process id and parent task id.
      *
      * @param parentWorkProcessId parentWorkProcessId
-     * @param parentTaskId        parentTaskId
+     * @param parentTaskId parentTaskId
      * @return process instance map
      */
     public ProcessInstanceMap findWorkProcessMapByParent(Integer 
parentWorkProcessId, Integer parentTaskId) {
@@ -1457,7 +1438,7 @@ public class ProcessService {
      * find sub process instance
      *
      * @param parentProcessId parentProcessId
-     * @param parentTaskId    parentTaskId
+     * @param parentTaskId parentTaskId
      * @return process instance
      */
     public ProcessInstance findSubProcessInstance(Integer parentProcessId, 
Integer parentTaskId) {
@@ -1489,12 +1470,12 @@ public class ProcessService {
     /**
      * change task state
      *
-     * @param state       state
-     * @param startTime   startTime
-     * @param host        host
+     * @param state state
+     * @param startTime startTime
+     * @param host host
      * @param executePath executePath
-     * @param logPath     logPath
-     * @param taskInstId  taskInstId
+     * @param logPath logPath
+     * @param taskInstId taskInstId
      */
     public void changeTaskState(TaskInstance taskInstance, ExecutionStatus 
state, Date startTime, String host,
                                 String executePath,
@@ -1522,12 +1503,12 @@ public class ProcessService {
      * update the process instance
      *
      * @param processInstanceId processInstanceId
-     * @param processJson       processJson
-     * @param globalParams      globalParams
-     * @param scheduleTime      scheduleTime
-     * @param flag              flag
-     * @param locations         locations
-     * @param connects          connects
+     * @param processJson processJson
+     * @param globalParams globalParams
+     * @param scheduleTime scheduleTime
+     * @param flag flag
+     * @param locations locations
+     * @param connects connects
      * @return update process instance result
      */
     public int updateProcessInstance(Integer processInstanceId, String 
processJson,
@@ -1548,10 +1529,10 @@ public class ProcessService {
     /**
      * change task state
      *
-     * @param state      state
-     * @param endTime    endTime
+     * @param state state
+     * @param endTime endTime
      * @param taskInstId taskInstId
-     * @param varPool    varPool
+     * @param varPool varPool
      */
     public void changeTaskState(TaskInstance taskInstance, ExecutionStatus 
state,
                                 Date endTime,
@@ -1577,7 +1558,7 @@ public class ProcessService {
         if (intList == null) {
             return new ArrayList<>();
         }
-        List<String> result = new ArrayList<String>(intList.size());
+        List<String> result = new ArrayList<>(intList.size());
         for (Integer intVar : intList) {
             result.add(String.valueOf(intVar));
         }
@@ -1642,7 +1623,7 @@ public class ProcessService {
      */
     public List<TaskInstance> queryNeedFailoverTaskInstances(String host) {
         return taskInstanceMapper.queryByHostAndStatus(host,
-            stateArray);
+                stateArray);
     }
 
     /**
@@ -1659,7 +1640,7 @@ public class ProcessService {
      * update process instance state by id
      *
      * @param processInstanceId processInstanceId
-     * @param executionStatus   executionStatus
+     * @param executionStatus executionStatus
      * @return update process result
      */
     public int updateProcessInstanceState(Integer processInstanceId, 
ExecutionStatus executionStatus) {
@@ -1696,7 +1677,7 @@ public class ProcessService {
     /**
      * find tenant code by resource name
      *
-     * @param resName      resource name
+     * @param resName resource name
      * @param resourceType resource type
      * @return tenant code
      */
@@ -1714,35 +1695,35 @@ public class ProcessService {
      */
     public List<Schedule> selectAllByProcessDefineId(int[] ids) {
         return scheduleMapper.selectAllByProcessDefineArray(
-            ids);
+                ids);
     }
 
     /**
      * get dependency cycle by work process define id and scheduler fire time
      *
-     * @param masterId            masterId
+     * @param masterId masterId
      * @param processDefinitionId processDefinitionId
-     * @param scheduledFireTime   the time the task schedule is expected to 
trigger
+     * @param scheduledFireTime the time the task schedule is expected to 
trigger
      * @return CycleDependency
      * @throws Exception if error throws Exception
      */
     public CycleDependency getCycleDependency(int masterId, int 
processDefinitionId, Date scheduledFireTime) throws Exception {
-        List<CycleDependency> list = getCycleDependencies(masterId, new int[] 
{processDefinitionId}, scheduledFireTime);
-        return list.size() > 0 ? list.get(0) : null;
+        List<CycleDependency> list = getCycleDependencies(masterId, new 
int[]{processDefinitionId}, scheduledFireTime);
+        return !list.isEmpty() ? list.get(0) : null;
 
     }
 
     /**
      * get dependency cycle list by work process define id list and scheduler 
fire time
      *
-     * @param masterId          masterId
-     * @param ids               ids
+     * @param masterId masterId
+     * @param ids ids
      * @param scheduledFireTime the time the task schedule is expected to 
trigger
      * @return CycleDependency list
      * @throws Exception if error throws Exception
      */
     public List<CycleDependency> getCycleDependencies(int masterId, int[] ids, 
Date scheduledFireTime) throws Exception {
-        List<CycleDependency> cycleDependencyList = new 
ArrayList<CycleDependency>();
+        List<CycleDependency> cycleDependencyList = new ArrayList<>();
         if (null == ids || ids.length == 0) {
             logger.warn("ids[] is empty!is invalid!");
             return cycleDependencyList;
@@ -1769,14 +1750,10 @@ public class ProcessService {
             }
             Calendar calendar = Calendar.getInstance();
             switch (cycleEnum) {
-                /*case MINUTE:
-                    calendar.add(Calendar.MINUTE,-61);*/
                 case HOUR:
                     calendar.add(Calendar.HOUR, -25);
                     break;
                 case DAY:
-                    calendar.add(Calendar.DATE, -32);
-                    break;
                 case WEEK:
                     calendar.add(Calendar.DATE, -32);
                     break;
@@ -1784,7 +1761,8 @@ public class ProcessService {
                     calendar.add(Calendar.MONTH, -13);
                     break;
                 default:
-                    logger.warn("Dependent process definition's  cycleEnum is 
{},not support!!", cycleEnum.name());
+                    String cycleName = cycleEnum.name();
+                    logger.warn("Dependent process definition's  cycleEnum is 
{},not support!!", cycleName);
                     continue;
             }
             Date start = calendar.getTime();
@@ -1794,7 +1772,7 @@ public class ProcessService {
             } else {
                 list = CronUtils.getFireDateList(start, scheduledFireTime, 
depCronExpression);
             }
-            if (list.size() >= 1) {
+            if (!list.isEmpty()) {
                 start = list.get(list.size() - 1);
                 CycleDependency dependency = new 
CycleDependency(depSchedule.getProcessDefinitionId(), start, 
CronUtils.getExpirationTime(start, cycleEnum), cycleEnum);
                 cycleDependencyList.add(dependency);
@@ -1813,8 +1791,8 @@ public class ProcessService {
      */
     public ProcessInstance findLastSchedulerProcessInterval(int definitionId, 
DateInterval dateInterval) {
         return processInstanceMapper.queryLastSchedulerProcess(definitionId,
-            dateInterval.getStartTime(),
-            dateInterval.getEndTime());
+                dateInterval.getStartTime(),
+                dateInterval.getEndTime());
     }
 
     /**
@@ -1826,23 +1804,23 @@ public class ProcessService {
      */
     public ProcessInstance findLastManualProcessInterval(int definitionId, 
DateInterval dateInterval) {
         return processInstanceMapper.queryLastManualProcess(definitionId,
-            dateInterval.getStartTime(),
-            dateInterval.getEndTime());
+                dateInterval.getStartTime(),
+                dateInterval.getEndTime());
     }
 
     /**
      * find last running process instance
      *
      * @param definitionId process definition id
-     * @param startTime    start time
-     * @param endTime      end time
+     * @param startTime start time
+     * @param endTime end time
      * @return process instance
      */
     public ProcessInstance findLastRunningProcess(int definitionId, Date 
startTime, Date endTime) {
         return processInstanceMapper.queryLastRunningProcess(definitionId,
-            startTime,
-            endTime,
-            stateArray);
+                startTime,
+                endTime,
+                stateArray);
     }
 
     /**
@@ -1867,6 +1845,7 @@ public class ProcessService {
 
     /**
      * query project name and user name by processInstanceId.
+     *
      * @param processInstanceId processInstanceId
      * @return projectName and userName
      */
@@ -1934,35 +1913,32 @@ public class ProcessService {
     /**
      * list unauthorized udf function
      *
-     * @param userId     user id
+     * @param userId user id
      * @param needChecks data source id array
      * @return unauthorized udf function list
      */
     public <T> List<T> listUnauthorized(int userId, T[] needChecks, 
AuthorizationType authorizationType) {
-        List<T> resultList = new ArrayList<T>();
+        List<T> resultList = new ArrayList<>();
 
         if (Objects.nonNull(needChecks) && needChecks.length > 0) {
-            Set<T> originResSet = new HashSet<T>(Arrays.asList(needChecks));
+            Set<T> originResSet = new HashSet<>(Arrays.asList(needChecks));
 
             switch (authorizationType) {
                 case RESOURCE_FILE_ID:
-                    Set<Integer> authorizedResourceFiles = 
resourceMapper.listAuthorizedResourceById(userId, needChecks).stream().map(t -> 
t.getId()).collect(toSet());
+                case UDF_FILE:
+                    Set<Integer> authorizedResourceFiles = 
resourceMapper.listAuthorizedResourceById(userId, 
needChecks).stream().map(Resource::getId).collect(toSet());
                     originResSet.removeAll(authorizedResourceFiles);
                     break;
                 case RESOURCE_FILE_NAME:
-                    Set<String> authorizedResources = 
resourceMapper.listAuthorizedResource(userId, needChecks).stream().map(t -> 
t.getFullName()).collect(toSet());
+                    Set<String> authorizedResources = 
resourceMapper.listAuthorizedResource(userId, 
needChecks).stream().map(Resource::getFullName).collect(toSet());
                     originResSet.removeAll(authorizedResources);
                     break;
-                case UDF_FILE:
-                    Set<Integer> authorizedUdfFiles = 
resourceMapper.listAuthorizedResourceById(userId, needChecks).stream().map(t -> 
t.getId()).collect(toSet());
-                    originResSet.removeAll(authorizedUdfFiles);
-                    break;
                 case DATASOURCE:
-                    Set<Integer> authorizedDatasources = 
dataSourceMapper.listAuthorizedDataSource(userId, needChecks).stream().map(t -> 
t.getId()).collect(toSet());
+                    Set<Integer> authorizedDatasources = 
dataSourceMapper.listAuthorizedDataSource(userId, 
needChecks).stream().map(DataSource::getId).collect(toSet());
                     originResSet.removeAll(authorizedDatasources);
                     break;
                 case UDF:
-                    Set<Integer> authorizedUdfs = 
udfFuncMapper.listAuthorizedUdfFunc(userId, needChecks).stream().map(t -> 
t.getId()).collect(toSet());
+                    Set<Integer> authorizedUdfs = 
udfFuncMapper.listAuthorizedUdfFunc(userId, 
needChecks).stream().map(UdfFunc::getId).collect(toSet());
                     originResSet.removeAll(authorizedUdfs);
                     break;
                 default:
@@ -2007,9 +1983,6 @@ public class ProcessService {
 
     /**
      * format task app id in task instance
-     *
-     * @param taskInstance
-     * @return
      */
     public String formatTaskAppId(TaskInstance taskInstance) {
         ProcessDefinition definition = 
this.findProcessDefineById(taskInstance.getProcessDefinitionId());
@@ -2019,9 +1992,9 @@ public class ProcessService {
             return "";
         }
         return String.format("%s_%s_%s",
-            definition.getId(),
-            processInstanceById.getId(),
-            taskInstance.getId());
+                definition.getId(),
+                processInstanceById.getId(),
+                taskInstance.getId());
     }
 
 }
diff --git 
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/quartz/ProcessScheduleJob.java
 
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/quartz/ProcessScheduleJob.java
index 6ac847b..2921ce2 100644
--- 
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/quartz/ProcessScheduleJob.java
+++ 
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/quartz/ProcessScheduleJob.java
@@ -14,8 +14,8 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.dolphinscheduler.service.quartz;
 
+package org.apache.dolphinscheduler.service.quartz;
 
 import org.apache.dolphinscheduler.common.Constants;
 import org.apache.dolphinscheduler.common.enums.CommandType;
@@ -25,6 +25,9 @@ import 
org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
 import org.apache.dolphinscheduler.dao.entity.Schedule;
 import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
 import org.apache.dolphinscheduler.service.process.ProcessService;
+
+import java.util.Date;
+
 import org.quartz.Job;
 import org.quartz.JobDataMap;
 import org.quartz.JobExecutionContext;
@@ -34,8 +37,6 @@ import org.slf4j.LoggerFactory;
 import org.springframework.util.Assert;
 import org.springframework.util.StringUtils;
 
-import java.util.Date;
-
 /**
  * process schedule job
  */
@@ -46,7 +47,7 @@ public class ProcessScheduleJob implements Job {
      */
     private static final Logger logger = 
LoggerFactory.getLogger(ProcessScheduleJob.class);
 
-    public ProcessService getProcessService(){
+    public ProcessService getProcessService() {
         return SpringApplicationContext.getBean(ProcessService.class);
     }
 
@@ -66,10 +67,8 @@ public class ProcessScheduleJob implements Job {
         int projectId = dataMap.getInt(Constants.PROJECT_ID);
         int scheduleId = dataMap.getInt(Constants.SCHEDULE_ID);
 
-
         Date scheduledFireTime = context.getScheduledFireTime();
 
-
         Date fireTime = context.getFireTime();
 
         logger.info("scheduled fire time :{}, fire time :{}, process id :{}", 
scheduledFireTime, fireTime, scheduleId);
@@ -82,11 +81,10 @@ public class ProcessScheduleJob implements Job {
             return;
         }
 
-
         ProcessDefinition processDefinition = 
getProcessService().findProcessDefineById(schedule.getProcessDefinitionId());
         // release state : online/offline
         ReleaseState releaseState = processDefinition.getReleaseState();
-        if (processDefinition == null || releaseState == ReleaseState.OFFLINE) 
{
+        if (releaseState == ReleaseState.OFFLINE) {
             logger.warn("process definition does not exist in db or 
offline,need not to create command, projectId:{}, processId:{}", projectId, 
scheduleId);
             return;
         }
@@ -107,7 +105,6 @@ public class ProcessScheduleJob implements Job {
         getProcessService().createCommand(command);
     }
 
-
     /**
      * delete job
      */
diff --git 
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/quartz/QuartzExecutors.java
 
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/quartz/QuartzExecutors.java
index 3b15810..fd91e40 100644
--- 
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/quartz/QuartzExecutors.java
+++ 
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/quartz/QuartzExecutors.java
@@ -14,15 +14,76 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.dolphinscheduler.service.quartz;
 
+import static 
org.apache.dolphinscheduler.common.Constants.ORG_POSTGRESQL_DRIVER;
+import static 
org.apache.dolphinscheduler.common.Constants.ORG_QUARTZ_DATASOURCE_MYDS_CONNECTIONPROVIDER_CLASS;
+import static 
org.apache.dolphinscheduler.common.Constants.ORG_QUARTZ_JOBSTORE_ACQUIRETRIGGERSWITHINLOCK;
+import static 
org.apache.dolphinscheduler.common.Constants.ORG_QUARTZ_JOBSTORE_CLASS;
+import static 
org.apache.dolphinscheduler.common.Constants.ORG_QUARTZ_JOBSTORE_CLUSTERCHECKININTERVAL;
+import static 
org.apache.dolphinscheduler.common.Constants.ORG_QUARTZ_JOBSTORE_DATASOURCE;
+import static 
org.apache.dolphinscheduler.common.Constants.ORG_QUARTZ_JOBSTORE_DRIVERDELEGATECLASS;
+import static 
org.apache.dolphinscheduler.common.Constants.ORG_QUARTZ_JOBSTORE_ISCLUSTERED;
+import static 
org.apache.dolphinscheduler.common.Constants.ORG_QUARTZ_JOBSTORE_MISFIRETHRESHOLD;
+import static 
org.apache.dolphinscheduler.common.Constants.ORG_QUARTZ_JOBSTORE_TABLEPREFIX;
+import static 
org.apache.dolphinscheduler.common.Constants.ORG_QUARTZ_JOBSTORE_USEPROPERTIES;
+import static 
org.apache.dolphinscheduler.common.Constants.ORG_QUARTZ_SCHEDULER_INSTANCEID;
+import static 
org.apache.dolphinscheduler.common.Constants.ORG_QUARTZ_SCHEDULER_INSTANCENAME;
+import static 
org.apache.dolphinscheduler.common.Constants.ORG_QUARTZ_SCHEDULER_MAKESCHEDULERTHREADDAEMON;
+import static 
org.apache.dolphinscheduler.common.Constants.ORG_QUARTZ_THREADPOOL_CLASS;
+import static 
org.apache.dolphinscheduler.common.Constants.ORG_QUARTZ_THREADPOOL_MAKETHREADSDAEMONS;
+import static 
org.apache.dolphinscheduler.common.Constants.ORG_QUARTZ_THREADPOOL_THREADCOUNT;
+import static 
org.apache.dolphinscheduler.common.Constants.ORG_QUARTZ_THREADPOOL_THREADPRIORITY;
+import static org.apache.dolphinscheduler.common.Constants.PROJECT_ID;
+import static 
org.apache.dolphinscheduler.common.Constants.QUARTZ_ACQUIRETRIGGERSWITHINLOCK;
+import static 
org.apache.dolphinscheduler.common.Constants.QUARTZ_CLUSTERCHECKININTERVAL;
+import static org.apache.dolphinscheduler.common.Constants.QUARTZ_DATASOURCE;
+import static org.apache.dolphinscheduler.common.Constants.QUARTZ_INSTANCEID;
+import static org.apache.dolphinscheduler.common.Constants.QUARTZ_INSTANCENAME;
+import static 
org.apache.dolphinscheduler.common.Constants.QUARTZ_JOB_GROUP_PRIFIX;
+import static org.apache.dolphinscheduler.common.Constants.QUARTZ_JOB_PRIFIX;
+import static 
org.apache.dolphinscheduler.common.Constants.QUARTZ_MISFIRETHRESHOLD;
+import static 
org.apache.dolphinscheduler.common.Constants.QUARTZ_PROPERTIES_PATH;
+import static org.apache.dolphinscheduler.common.Constants.QUARTZ_TABLE_PREFIX;
+import static org.apache.dolphinscheduler.common.Constants.QUARTZ_THREADCOUNT;
+import static 
org.apache.dolphinscheduler.common.Constants.QUARTZ_THREADPRIORITY;
+import static org.apache.dolphinscheduler.common.Constants.SCHEDULE;
+import static org.apache.dolphinscheduler.common.Constants.SCHEDULE_ID;
+import static 
org.apache.dolphinscheduler.common.Constants.SPRING_DATASOURCE_DRIVER_CLASS_NAME;
+import static org.apache.dolphinscheduler.common.Constants.STRING_FALSE;
+import static org.apache.dolphinscheduler.common.Constants.STRING_TRUE;
+import static org.apache.dolphinscheduler.common.Constants.UNDERLINE;
+
+import static org.quartz.CronScheduleBuilder.cronSchedule;
+import static org.quartz.JobBuilder.newJob;
+import static org.quartz.TriggerBuilder.newTrigger;
+
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
+import org.apache.dolphinscheduler.common.utils.StringUtils;
+import org.apache.dolphinscheduler.dao.entity.Schedule;
+import org.apache.dolphinscheduler.service.exceptions.ServiceException;
+
 import org.apache.commons.configuration.Configuration;
 import org.apache.commons.configuration.ConfigurationException;
 import org.apache.commons.configuration.PropertiesConfiguration;
-import org.apache.commons.lang.StringUtils;
-import org.apache.dolphinscheduler.common.utils.*;
-import org.apache.dolphinscheduler.dao.entity.Schedule;
-import org.quartz.*;
+
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import org.quartz.CronTrigger;
+import org.quartz.Job;
+import org.quartz.JobDetail;
+import org.quartz.JobKey;
+import org.quartz.Scheduler;
+import org.quartz.SchedulerException;
+import org.quartz.TriggerKey;
 import org.quartz.impl.StdSchedulerFactory;
 import org.quartz.impl.jdbcjobstore.JobStoreTX;
 import org.quartz.impl.jdbcjobstore.PostgreSQLDelegate;
@@ -32,300 +93,289 @@ import org.quartz.simpl.SimpleThreadPool;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.*;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
-import static org.apache.dolphinscheduler.common.Constants.*;
-import static org.quartz.CronScheduleBuilder.cronSchedule;
-import static org.quartz.JobBuilder.newJob;
-import static org.quartz.TriggerBuilder.newTrigger;
-
 /**
  * single Quartz executors instance
  */
 public class QuartzExecutors {
 
-  /**
-   * logger of QuartzExecutors
-   */
-  private static final Logger logger = 
LoggerFactory.getLogger(QuartzExecutors.class);
-
-  /**
-   * read write lock
-   */
-  private final ReadWriteLock lock = new ReentrantReadWriteLock();
-
-  /**
-   * A Scheduler maintains a registry of org.quartz.JobDetail and Trigger.
-   */
-  private static Scheduler scheduler;
-
-  /**
-   * load conf
-   */
-  private static Configuration conf;
-
-  private static final class Holder {
-    private static final QuartzExecutors instance = new QuartzExecutors();
-  }
-
-
-  private QuartzExecutors() {
-    try {
-      conf = new PropertiesConfiguration(QUARTZ_PROPERTIES_PATH);
-      init();
-    }catch (ConfigurationException e){
-      logger.warn("not loaded quartz configuration file, will used default 
value",e);
+    /**
+     * logger of QuartzExecutors
+     */
+    private static final Logger logger = 
LoggerFactory.getLogger(QuartzExecutors.class);
+
+    /**
+     * read write lock
+     */
+    private final ReadWriteLock lock = new ReentrantReadWriteLock();
+
+    /**
+     * A Scheduler maintains a registry of org.quartz.JobDetail and Trigger.
+     */
+    private static Scheduler scheduler;
+
+    /**
+     * load conf
+     */
+    private static Configuration conf;
+
+    private static final class Holder {
+        private static final QuartzExecutors instance = new QuartzExecutors();
     }
-  }
-
-  /**
-   * thread safe and performance promote
-   * @return instance of Quartz Executors
-   */
-  public static QuartzExecutors getInstance() {
-   return Holder.instance;
-  }
-
-
-  /**
-   * init
-   *
-   * Returns a client-usable handle to a Scheduler.
-   */
-  private void init() {
-    try {
-      StdSchedulerFactory schedulerFactory = new StdSchedulerFactory();
-      Properties properties = new Properties();
-
-      String dataSourceDriverClass = 
org.apache.dolphinscheduler.dao.utils.PropertyUtils.getString(SPRING_DATASOURCE_DRIVER_CLASS_NAME);
-      if (dataSourceDriverClass.equals(ORG_POSTGRESQL_DRIVER)){
-        
properties.setProperty(ORG_QUARTZ_JOBSTORE_DRIVERDELEGATECLASS,conf.getString(ORG_QUARTZ_JOBSTORE_DRIVERDELEGATECLASS,
 PostgreSQLDelegate.class.getName()));
-      } else {
-        
properties.setProperty(ORG_QUARTZ_JOBSTORE_DRIVERDELEGATECLASS,conf.getString(ORG_QUARTZ_JOBSTORE_DRIVERDELEGATECLASS,
 StdJDBCDelegate.class.getName()));
-      }
-      properties.setProperty(ORG_QUARTZ_SCHEDULER_INSTANCENAME, 
conf.getString(ORG_QUARTZ_SCHEDULER_INSTANCENAME, QUARTZ_INSTANCENAME));
-      properties.setProperty(ORG_QUARTZ_SCHEDULER_INSTANCEID, 
conf.getString(ORG_QUARTZ_SCHEDULER_INSTANCEID, QUARTZ_INSTANCEID));
-      
properties.setProperty(ORG_QUARTZ_SCHEDULER_MAKESCHEDULERTHREADDAEMON,conf.getString(ORG_QUARTZ_SCHEDULER_MAKESCHEDULERTHREADDAEMON,STRING_TRUE));
-      
properties.setProperty(ORG_QUARTZ_JOBSTORE_USEPROPERTIES,conf.getString(ORG_QUARTZ_JOBSTORE_USEPROPERTIES,STRING_FALSE));
-      
properties.setProperty(ORG_QUARTZ_THREADPOOL_CLASS,conf.getString(ORG_QUARTZ_THREADPOOL_CLASS,
 SimpleThreadPool.class.getName()));
-      
properties.setProperty(ORG_QUARTZ_THREADPOOL_MAKETHREADSDAEMONS,conf.getString(ORG_QUARTZ_THREADPOOL_MAKETHREADSDAEMONS,STRING_TRUE));
-      
properties.setProperty(ORG_QUARTZ_THREADPOOL_THREADCOUNT,conf.getString(ORG_QUARTZ_THREADPOOL_THREADCOUNT,
 QUARTZ_THREADCOUNT));
-      
properties.setProperty(ORG_QUARTZ_THREADPOOL_THREADPRIORITY,conf.getString(ORG_QUARTZ_THREADPOOL_THREADPRIORITY,
 QUARTZ_THREADPRIORITY));
-      
properties.setProperty(ORG_QUARTZ_JOBSTORE_CLASS,conf.getString(ORG_QUARTZ_JOBSTORE_CLASS,
 JobStoreTX.class.getName()));
-      
properties.setProperty(ORG_QUARTZ_JOBSTORE_TABLEPREFIX,conf.getString(ORG_QUARTZ_JOBSTORE_TABLEPREFIX,
 QUARTZ_TABLE_PREFIX));
-      
properties.setProperty(ORG_QUARTZ_JOBSTORE_ISCLUSTERED,conf.getString(ORG_QUARTZ_JOBSTORE_ISCLUSTERED,STRING_TRUE));
-      
properties.setProperty(ORG_QUARTZ_JOBSTORE_MISFIRETHRESHOLD,conf.getString(ORG_QUARTZ_JOBSTORE_MISFIRETHRESHOLD,
 QUARTZ_MISFIRETHRESHOLD));
-      
properties.setProperty(ORG_QUARTZ_JOBSTORE_CLUSTERCHECKININTERVAL,conf.getString(ORG_QUARTZ_JOBSTORE_CLUSTERCHECKININTERVAL,
 QUARTZ_CLUSTERCHECKININTERVAL));
-      
properties.setProperty(ORG_QUARTZ_JOBSTORE_ACQUIRETRIGGERSWITHINLOCK,conf.getString(ORG_QUARTZ_JOBSTORE_ACQUIRETRIGGERSWITHINLOCK,
 QUARTZ_ACQUIRETRIGGERSWITHINLOCK));
-      
properties.setProperty(ORG_QUARTZ_JOBSTORE_DATASOURCE,conf.getString(ORG_QUARTZ_JOBSTORE_DATASOURCE,
 QUARTZ_DATASOURCE));
-      
properties.setProperty(ORG_QUARTZ_DATASOURCE_MYDS_CONNECTIONPROVIDER_CLASS,conf.getString(ORG_QUARTZ_DATASOURCE_MYDS_CONNECTIONPROVIDER_CLASS,DruidConnectionProvider.class.getName()));
-
-      schedulerFactory.initialize(properties);
-      scheduler = schedulerFactory.getScheduler();
-
-    } catch (SchedulerException e) {
-      logger.error(e.getMessage(),e);
-      System.exit(1);
+
+    private QuartzExecutors() {
+        try {
+            conf = new PropertiesConfiguration(QUARTZ_PROPERTIES_PATH);
+            init();
+        } catch (ConfigurationException e) {
+            logger.warn("not loaded quartz configuration file, will used 
default value", e);
+        }
     }
 
-  }
-
-  /**
-   * Whether the scheduler has been started.
-   *
-   * @throws SchedulerException scheduler exception
-   */
-  public void start() throws SchedulerException {
-    if (!scheduler.isStarted()){
-      scheduler.start();
-      logger.info("Quartz service started" );
+    /**
+     * thread safe and performance promote
+     *
+     * @return instance of Quartz Executors
+     */
+    public static QuartzExecutors getInstance() {
+        return Holder.instance;
     }
-  }
-
-  /**
-   * stop all scheduled tasks
-   *
-   * Halts the Scheduler's firing of Triggers,
-   * and cleans up all resources associated with the Scheduler.
-   *
-   * The scheduler cannot be re-started.
-   * @throws SchedulerException scheduler exception
-   */
-  public void shutdown() throws SchedulerException {
-    if (!scheduler.isShutdown()) {
-        // don't wait for the task to complete
-        scheduler.shutdown();
-        logger.info("Quartz service stopped, and halt all tasks");
+
+    /**
+     * init
+     * <p>
+     * Returns a client-usable handle to a Scheduler.
+     */
+    private void init() {
+        try {
+            StdSchedulerFactory schedulerFactory = new StdSchedulerFactory();
+            Properties properties = new Properties();
+
+            String dataSourceDriverClass = 
org.apache.dolphinscheduler.dao.utils.PropertyUtils.getString(SPRING_DATASOURCE_DRIVER_CLASS_NAME);
+            if (dataSourceDriverClass.equals(ORG_POSTGRESQL_DRIVER)) {
+                
properties.setProperty(ORG_QUARTZ_JOBSTORE_DRIVERDELEGATECLASS, 
conf.getString(ORG_QUARTZ_JOBSTORE_DRIVERDELEGATECLASS, 
PostgreSQLDelegate.class.getName()));
+            } else {
+                
properties.setProperty(ORG_QUARTZ_JOBSTORE_DRIVERDELEGATECLASS, 
conf.getString(ORG_QUARTZ_JOBSTORE_DRIVERDELEGATECLASS, 
StdJDBCDelegate.class.getName()));
+            }
+            properties.setProperty(ORG_QUARTZ_SCHEDULER_INSTANCENAME, 
conf.getString(ORG_QUARTZ_SCHEDULER_INSTANCENAME, QUARTZ_INSTANCENAME));
+            properties.setProperty(ORG_QUARTZ_SCHEDULER_INSTANCEID, 
conf.getString(ORG_QUARTZ_SCHEDULER_INSTANCEID, QUARTZ_INSTANCEID));
+            
properties.setProperty(ORG_QUARTZ_SCHEDULER_MAKESCHEDULERTHREADDAEMON, 
conf.getString(ORG_QUARTZ_SCHEDULER_MAKESCHEDULERTHREADDAEMON, STRING_TRUE));
+            properties.setProperty(ORG_QUARTZ_JOBSTORE_USEPROPERTIES, 
conf.getString(ORG_QUARTZ_JOBSTORE_USEPROPERTIES, STRING_FALSE));
+            properties.setProperty(ORG_QUARTZ_THREADPOOL_CLASS, 
conf.getString(ORG_QUARTZ_THREADPOOL_CLASS, SimpleThreadPool.class.getName()));
+            properties.setProperty(ORG_QUARTZ_THREADPOOL_MAKETHREADSDAEMONS, 
conf.getString(ORG_QUARTZ_THREADPOOL_MAKETHREADSDAEMONS, STRING_TRUE));
+            properties.setProperty(ORG_QUARTZ_THREADPOOL_THREADCOUNT, 
conf.getString(ORG_QUARTZ_THREADPOOL_THREADCOUNT, QUARTZ_THREADCOUNT));
+            properties.setProperty(ORG_QUARTZ_THREADPOOL_THREADPRIORITY, 
conf.getString(ORG_QUARTZ_THREADPOOL_THREADPRIORITY, QUARTZ_THREADPRIORITY));
+            properties.setProperty(ORG_QUARTZ_JOBSTORE_CLASS, 
conf.getString(ORG_QUARTZ_JOBSTORE_CLASS, JobStoreTX.class.getName()));
+            properties.setProperty(ORG_QUARTZ_JOBSTORE_TABLEPREFIX, 
conf.getString(ORG_QUARTZ_JOBSTORE_TABLEPREFIX, QUARTZ_TABLE_PREFIX));
+            properties.setProperty(ORG_QUARTZ_JOBSTORE_ISCLUSTERED, 
conf.getString(ORG_QUARTZ_JOBSTORE_ISCLUSTERED, STRING_TRUE));
+            properties.setProperty(ORG_QUARTZ_JOBSTORE_MISFIRETHRESHOLD, 
conf.getString(ORG_QUARTZ_JOBSTORE_MISFIRETHRESHOLD, QUARTZ_MISFIRETHRESHOLD));
+            properties.setProperty(ORG_QUARTZ_JOBSTORE_CLUSTERCHECKININTERVAL, 
conf.getString(ORG_QUARTZ_JOBSTORE_CLUSTERCHECKININTERVAL, 
QUARTZ_CLUSTERCHECKININTERVAL));
+            
properties.setProperty(ORG_QUARTZ_JOBSTORE_ACQUIRETRIGGERSWITHINLOCK, 
conf.getString(ORG_QUARTZ_JOBSTORE_ACQUIRETRIGGERSWITHINLOCK, 
QUARTZ_ACQUIRETRIGGERSWITHINLOCK));
+            properties.setProperty(ORG_QUARTZ_JOBSTORE_DATASOURCE, 
conf.getString(ORG_QUARTZ_JOBSTORE_DATASOURCE, QUARTZ_DATASOURCE));
+            
properties.setProperty(ORG_QUARTZ_DATASOURCE_MYDS_CONNECTIONPROVIDER_CLASS, 
conf.getString(ORG_QUARTZ_DATASOURCE_MYDS_CONNECTIONPROVIDER_CLASS, 
DruidConnectionProvider.class.getName()));
+
+            schedulerFactory.initialize(properties);
+            scheduler = schedulerFactory.getScheduler();
+
+        } catch (SchedulerException e) {
+            logger.error(e.getMessage(), e);
+            System.exit(1);
+        }
+
     }
-  }
-
-
-  /**
-   * add task trigger , if this task already exists, return this task with 
updated trigger
-   *
-   * @param clazz             job class name
-   * @param jobName           job name
-   * @param jobGroupName      job group name
-   * @param startDate         job start date
-   * @param endDate           job end date
-   * @param cronExpression    cron expression
-   * @param jobDataMap        job parameters data map
-   */
-  public void addJob(Class<? extends Job> clazz,String jobName,String 
jobGroupName,Date startDate, Date endDate,
-                                 String cronExpression,
-                                 Map<String, Object> jobDataMap) {
-    lock.writeLock().lock();
-    try {
-
-      JobKey jobKey = new JobKey(jobName, jobGroupName);
-      JobDetail jobDetail;
-      //add a task (if this task already exists, return this task directly)
-      if (scheduler.checkExists(jobKey)) {
-
-        jobDetail = scheduler.getJobDetail(jobKey);
-        if (jobDataMap != null) {
-          jobDetail.getJobDataMap().putAll(jobDataMap);
+
+    /**
+     * Whether the scheduler has been started.
+     *
+     * @throws SchedulerException scheduler exception
+     */
+    public void start() throws SchedulerException {
+        if (!scheduler.isStarted()) {
+            scheduler.start();
+            logger.info("Quartz service started");
         }
-      } else {
-        jobDetail = newJob(clazz).withIdentity(jobKey).build();
+    }
 
-        if (jobDataMap != null) {
-          jobDetail.getJobDataMap().putAll(jobDataMap);
+    /**
+     * stop all scheduled tasks
+     * <p>
+     * Halts the Scheduler's firing of Triggers,
+     * and cleans up all resources associated with the Scheduler.
+     * <p>
+     * The scheduler cannot be re-started.
+     *
+     * @throws SchedulerException scheduler exception
+     */
+    public void shutdown() throws SchedulerException {
+        if (!scheduler.isShutdown()) {
+            // don't wait for the task to complete
+            scheduler.shutdown();
+            logger.info("Quartz service stopped, and halt all tasks");
         }
+    }
 
-        scheduler.addJob(jobDetail, false, true);
-
-        logger.info("Add job, job name: {}, group name: {}",
-                jobName, jobGroupName);
-      }
-
-      TriggerKey triggerKey = new TriggerKey(jobName, jobGroupName);
-      /**
-       * Instructs the Scheduler that upon a mis-fire
-       * situation, the CronTrigger wants to have it's
-       * next-fire-time updated to the next time in the schedule after the
-       * current time (taking into account any associated Calendar),
-       * but it does not want to be fired now.
-       */
-      CronTrigger cronTrigger = 
newTrigger().withIdentity(triggerKey).startAt(startDate).endAt(endDate)
-              
.withSchedule(cronSchedule(cronExpression).withMisfireHandlingInstructionDoNothing())
-              .forJob(jobDetail).build();
-
-      if (scheduler.checkExists(triggerKey)) {
-          // updateProcessInstance scheduler trigger when scheduler cycle 
changes
-          CronTrigger oldCronTrigger = (CronTrigger) 
scheduler.getTrigger(triggerKey);
-          String oldCronExpression = oldCronTrigger.getCronExpression();
-
-          if (!StringUtils.equalsIgnoreCase(cronExpression,oldCronExpression)) 
{
-            // reschedule job trigger
-            scheduler.rescheduleJob(triggerKey, cronTrigger);
-            logger.info("reschedule job trigger, triggerName: {}, 
triggerGroupName: {}, cronExpression: {}, startDate: {}, endDate: {}",
-                    jobName, jobGroupName, cronExpression, startDate, endDate);
-          }
-      } else {
-        scheduler.scheduleJob(cronTrigger);
-        logger.info("schedule job trigger, triggerName: {}, triggerGroupName: 
{}, cronExpression: {}, startDate: {}, endDate: {}",
-                jobName, jobGroupName, cronExpression, startDate, endDate);
-      }
-
-    } catch (Exception e) {
-      logger.error("add job failed", e);
-      throw new RuntimeException("add job failed", e);
-    } finally {
-      lock.writeLock().unlock();
+    /**
+     * add task trigger , if this task already exists, return this task with 
updated trigger
+     *
+     * @param clazz job class name
+     * @param jobName job name
+     * @param jobGroupName job group name
+     * @param startDate job start date
+     * @param endDate job end date
+     * @param cronExpression cron expression
+     * @param jobDataMap job parameters data map
+     */
+    public void addJob(Class<? extends Job> clazz, String jobName, String 
jobGroupName, Date startDate, Date endDate,
+                       String cronExpression,
+                       Map<String, Object> jobDataMap) {
+        lock.writeLock().lock();
+        try {
+
+            JobKey jobKey = new JobKey(jobName, jobGroupName);
+            JobDetail jobDetail;
+            //add a task (if this task already exists, return this task 
directly)
+            if (scheduler.checkExists(jobKey)) {
+
+                jobDetail = scheduler.getJobDetail(jobKey);
+                if (jobDataMap != null) {
+                    jobDetail.getJobDataMap().putAll(jobDataMap);
+                }
+            } else {
+                jobDetail = newJob(clazz).withIdentity(jobKey).build();
+
+                if (jobDataMap != null) {
+                    jobDetail.getJobDataMap().putAll(jobDataMap);
+                }
+
+                scheduler.addJob(jobDetail, false, true);
+
+                logger.info("Add job, job name: {}, group name: {}",
+                        jobName, jobGroupName);
+            }
+
+            TriggerKey triggerKey = new TriggerKey(jobName, jobGroupName);
+            /**
+             * Instructs the Scheduler that upon a mis-fire
+             * situation, the CronTrigger wants to have it's
+             * next-fire-time updated to the next time in the schedule after 
the
+             * current time (taking into account any associated Calendar),
+             * but it does not want to be fired now.
+             */
+            CronTrigger cronTrigger = 
newTrigger().withIdentity(triggerKey).startAt(startDate).endAt(endDate)
+                    
.withSchedule(cronSchedule(cronExpression).withMisfireHandlingInstructionDoNothing())
+                    .forJob(jobDetail).build();
+
+            if (scheduler.checkExists(triggerKey)) {
+                // updateProcessInstance scheduler trigger when scheduler 
cycle changes
+                CronTrigger oldCronTrigger = (CronTrigger) 
scheduler.getTrigger(triggerKey);
+                String oldCronExpression = oldCronTrigger.getCronExpression();
+
+                if (!StringUtils.equalsIgnoreCase(cronExpression, 
oldCronExpression)) {
+                    // reschedule job trigger
+                    scheduler.rescheduleJob(triggerKey, cronTrigger);
+                    logger.info("reschedule job trigger, triggerName: {}, 
triggerGroupName: {}, cronExpression: {}, startDate: {}, endDate: {}",
+                            jobName, jobGroupName, cronExpression, startDate, 
endDate);
+                }
+            } else {
+                scheduler.scheduleJob(cronTrigger);
+                logger.info("schedule job trigger, triggerName: {}, 
triggerGroupName: {}, cronExpression: {}, startDate: {}, endDate: {}",
+                        jobName, jobGroupName, cronExpression, startDate, 
endDate);
+            }
+
+        } catch (Exception e) {
+            throw new ServiceException("add job failed", e);
+        } finally {
+            lock.writeLock().unlock();
+        }
     }
-  }
-
-
-  /**
-   * delete job
-   *
-   * @param jobName      job name
-   * @param jobGroupName job group name
-   * @return true if the Job was found and deleted.
-   */
-  public boolean deleteJob(String jobName, String jobGroupName) {
-    lock.writeLock().lock();
-    try {
-      JobKey jobKey = new JobKey(jobName,jobGroupName);
-      if(scheduler.checkExists(jobKey)){
-        logger.info("try to delete job, job name: {}, job group name: {},", 
jobName, jobGroupName);
-        return scheduler.deleteJob(jobKey);
-      }else {
-        return true;
-      }
-
-    } catch (SchedulerException e) {
-      logger.error("delete job : {} failed",jobName, e);
-    } finally {
-      lock.writeLock().unlock();
+
+    /**
+     * delete job
+     *
+     * @param jobName job name
+     * @param jobGroupName job group name
+     * @return true if the Job was found and deleted.
+     */
+    public boolean deleteJob(String jobName, String jobGroupName) {
+        lock.writeLock().lock();
+        try {
+            JobKey jobKey = new JobKey(jobName, jobGroupName);
+            if (scheduler.checkExists(jobKey)) {
+                logger.info("try to delete job, job name: {}, job group name: 
{},", jobName, jobGroupName);
+                return scheduler.deleteJob(jobKey);
+            } else {
+                return true;
+            }
+
+        } catch (SchedulerException e) {
+            logger.error("delete job : {} failed", jobName, e);
+        } finally {
+            lock.writeLock().unlock();
+        }
+        return false;
+    }
+
+    /**
+     * delete all jobs in job group
+     *
+     * @param jobGroupName job group name
+     * @return true if all of the Jobs were found and deleted, false if
+     * one or more were not deleted.
+     */
+    public boolean deleteAllJobs(String jobGroupName) {
+        lock.writeLock().lock();
+        try {
+            logger.info("try to delete all jobs in job group: {}", 
jobGroupName);
+            List<JobKey> jobKeys = new ArrayList<>();
+            
jobKeys.addAll(scheduler.getJobKeys(GroupMatcher.groupEndsWith(jobGroupName)));
+
+            return scheduler.deleteJobs(jobKeys);
+        } catch (SchedulerException e) {
+            logger.error("delete all jobs in job group: {} failed", 
jobGroupName, e);
+        } finally {
+            lock.writeLock().unlock();
+        }
+        return false;
     }
-    return false;
-  }
-
-  /**
-   * delete all jobs in job group
-   *
-   * @param jobGroupName job group name
-   *
-   * @return true if all of the Jobs were found and deleted, false if
-   *      one or more were not deleted.
-   */
-  public boolean deleteAllJobs(String jobGroupName) {
-    lock.writeLock().lock();
-    try {
-      logger.info("try to delete all jobs in job group: {}", jobGroupName);
-      List<JobKey> jobKeys = new ArrayList<>();
-      
jobKeys.addAll(scheduler.getJobKeys(GroupMatcher.groupEndsWith(jobGroupName)));
-
-      return scheduler.deleteJobs(jobKeys);
-    } catch (SchedulerException e) {
-      logger.error("delete all jobs in job group: {} failed",jobGroupName, e);
-    } finally {
-      lock.writeLock().unlock();
+
+    /**
+     * build job name
+     *
+     * @param processId process id
+     * @return job name
+     */
+    public static String buildJobName(int processId) {
+        StringBuilder sb = new StringBuilder(30);
+        sb.append(QUARTZ_JOB_PRIFIX).append(UNDERLINE).append(processId);
+        return sb.toString();
+    }
+
+    /**
+     * build job group name
+     *
+     * @param projectId project id
+     * @return job group name
+     */
+    public static String buildJobGroupName(int projectId) {
+        StringBuilder sb = new StringBuilder(30);
+        sb.append(QUARTZ_JOB_GROUP_PRIFIX).append(UNDERLINE).append(projectId);
+        return sb.toString();
+    }
+
+    /**
+     * add params to map
+     *
+     * @param projectId project id
+     * @param scheduleId schedule id
+     * @param schedule schedule
+     * @return data map
+     */
+    public static Map<String, Object> buildDataMap(int projectId, int 
scheduleId, Schedule schedule) {
+        Map<String, Object> dataMap = new HashMap<>(3);
+        dataMap.put(PROJECT_ID, projectId);
+        dataMap.put(SCHEDULE_ID, scheduleId);
+        dataMap.put(SCHEDULE, JSONUtils.toJsonString(schedule));
+
+        return dataMap;
     }
-    return false;
-  }
-
-  /**
-   * build job name
-   * @param processId process id
-   * @return job name
-   */
-  public static String buildJobName(int processId) {
-    StringBuilder sb = new StringBuilder(30);
-    sb.append(QUARTZ_JOB_PRIFIX).append(UNDERLINE).append(processId);
-    return sb.toString();
-  }
-
-  /**
-   * build job group name
-   * @param projectId project id
-   * @return job group name
-   */
-  public static String buildJobGroupName(int projectId) {
-    StringBuilder sb = new StringBuilder(30);
-    sb.append(QUARTZ_JOB_GROUP_PRIFIX).append(UNDERLINE).append(projectId);
-    return sb.toString();
-  }
-
-  /**
-   * add params to map
-   *
-   * @param projectId   project id
-   * @param scheduleId  schedule id
-   * @param schedule    schedule
-   * @return data map
-   */
-  public static Map<String, Object> buildDataMap(int projectId, int 
scheduleId, Schedule schedule) {
-    Map<String, Object> dataMap = new HashMap<>(3);
-    dataMap.put(PROJECT_ID, projectId);
-    dataMap.put(SCHEDULE_ID, scheduleId);
-    dataMap.put(SCHEDULE, JSONUtils.toJsonString(schedule));
-
-    return dataMap;
-  }
 
 }
diff --git 
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/quartz/cron/AbstractCycle.java
 
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/quartz/cron/AbstractCycle.java
index 0a2e31b..60c8623 100644
--- 
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/quartz/cron/AbstractCycle.java
+++ 
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/quartz/cron/AbstractCycle.java
@@ -14,159 +14,177 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.dolphinscheduler.service.quartz.cron;
 
+import org.apache.dolphinscheduler.common.enums.CycleEnum;
+
 import com.cronutils.model.Cron;
 import com.cronutils.model.field.CronField;
 import com.cronutils.model.field.CronFieldName;
-import com.cronutils.model.field.expression.*;
-import org.apache.dolphinscheduler.common.enums.CycleEnum;
+import com.cronutils.model.field.expression.Always;
+import com.cronutils.model.field.expression.And;
+import com.cronutils.model.field.expression.Between;
+import com.cronutils.model.field.expression.Every;
+import com.cronutils.model.field.expression.FieldExpression;
+import com.cronutils.model.field.expression.On;
 
 /**
  * Cycle
  */
 public abstract class AbstractCycle {
 
-  protected Cron cron;
-
-  protected CronField minField;
-  protected CronField hourField;
-  protected CronField dayOfMonthField;
-  protected CronField dayOfWeekField;
-  protected CronField monthField;
-  protected CronField yearField;
-
-  public CycleLinks addCycle(AbstractCycle cycle) {
-    return new CycleLinks(this.cron).addCycle(this).addCycle(cycle);
-  }
-
-  /**
-   * cycle constructor
-   * @param cron cron
-   */
-  public AbstractCycle(Cron cron) {
-    if (cron == null) {
-      throw new IllegalArgumentException("cron must not be null!");
+    protected Cron cron;
+
+    protected CronField minField;
+    protected CronField hourField;
+    protected CronField dayOfMonthField;
+    protected CronField dayOfWeekField;
+    protected CronField monthField;
+    protected CronField yearField;
+
+    public CycleLinks addCycle(AbstractCycle cycle) {
+        return new CycleLinks(this.cron).addCycle(this).addCycle(cycle);
+    }
+
+    /**
+     * cycle constructor
+     *
+     * @param cron cron
+     */
+    protected AbstractCycle(Cron cron) {
+        if (cron == null) {
+            throw new IllegalArgumentException("cron must not be null!");
+        }
+
+        this.cron = cron;
+        this.minField = cron.retrieve(CronFieldName.MINUTE);
+        this.hourField = cron.retrieve(CronFieldName.HOUR);
+        this.dayOfMonthField = cron.retrieve(CronFieldName.DAY_OF_MONTH);
+        this.dayOfWeekField = cron.retrieve(CronFieldName.DAY_OF_WEEK);
+        this.monthField = cron.retrieve(CronFieldName.MONTH);
+        this.yearField = cron.retrieve(CronFieldName.YEAR);
+    }
+
+    /**
+     * whether the minute field has a value
+     *
+     * @return if minute field has a value return true,else return false
+     */
+    protected boolean minFiledIsSetAll() {
+        FieldExpression minFieldExpression = minField.getExpression();
+        return (minFieldExpression instanceof Every || minFieldExpression 
instanceof Always
+                || minFieldExpression instanceof Between || minFieldExpression 
instanceof And
+                || minFieldExpression instanceof On);
+    }
+
+    /**
+     * whether the minute field has a value of every or always
+     *
+     * @return if minute field has a value of every or always return true,else 
return false
+     */
+    protected boolean minFiledIsEvery() {
+        FieldExpression minFieldExpression = minField.getExpression();
+        return (minFieldExpression instanceof Every || minFieldExpression 
instanceof Always);
+    }
+
+    /**
+     * whether the hour field has a value
+     *
+     * @return if hour field has a value return true,else return false
+     */
+    protected boolean hourFiledIsSetAll() {
+        FieldExpression hourFieldExpression = hourField.getExpression();
+        return (hourFieldExpression instanceof Every || hourFieldExpression 
instanceof Always
+                || hourFieldExpression instanceof Between || 
hourFieldExpression instanceof And
+                || hourFieldExpression instanceof On);
+    }
+
+    /**
+     * whether the hour field has a value of every or always
+     *
+     * @return if hour field has a value of every or always return true,else 
return false
+     */
+    protected boolean hourFiledIsEvery() {
+        FieldExpression hourFieldExpression = hourField.getExpression();
+        return (hourFieldExpression instanceof Every || hourFieldExpression 
instanceof Always);
+    }
+
+    /**
+     * whether the day Of month field has a value
+     *
+     * @return if day Of month field has a value return true,else return false
+     */
+    protected boolean dayOfMonthFieldIsSetAll() {
+        return (dayOfMonthField.getExpression() instanceof Every || 
dayOfMonthField.getExpression() instanceof Always
+                || dayOfMonthField.getExpression() instanceof Between || 
dayOfMonthField.getExpression() instanceof And
+                || dayOfMonthField.getExpression() instanceof On);
+    }
+
+    /**
+     * whether the day Of Month field has a value of every or always
+     *
+     * @return if day Of Month field has a value of every or always return 
true,else return false
+     */
+    protected boolean dayOfMonthFieldIsEvery() {
+        return (dayOfMonthField.getExpression() instanceof Every || 
dayOfMonthField.getExpression() instanceof Always);
+    }
+
+    /**
+     * whether month field has a value
+     *
+     * @return if month field has a value return true,else return false
+     */
+    protected boolean monthFieldIsSetAll() {
+        FieldExpression monthFieldExpression = monthField.getExpression();
+        return (monthFieldExpression instanceof Every || monthFieldExpression 
instanceof Always
+                || monthFieldExpression instanceof Between || 
monthFieldExpression instanceof And
+                || monthFieldExpression instanceof On);
+    }
+
+    /**
+     * whether the month field has a value of every or always
+     *
+     * @return if  month field has a value of every or always return true,else 
return false
+     */
+    protected boolean monthFieldIsEvery() {
+        FieldExpression monthFieldExpression = monthField.getExpression();
+        return (monthFieldExpression instanceof Every || monthFieldExpression 
instanceof Always);
+    }
+
+    /**
+     * whether the day Of week field has a value
+     *
+     * @return if day Of week field has a value return true,else return false
+     */
+    protected boolean dayofWeekFieldIsSetAll() {
+        FieldExpression dayOfWeekFieldExpression = 
dayOfWeekField.getExpression();
+        return (dayOfWeekFieldExpression instanceof Every || 
dayOfWeekFieldExpression instanceof Always
+                || dayOfWeekFieldExpression instanceof Between || 
dayOfWeekFieldExpression instanceof And
+                || dayOfWeekFieldExpression instanceof On);
+    }
+
+    /**
+     * whether the day Of week field has a value of every or always
+     *
+     * @return if day Of week field has a value of every or always return 
true,else return false
+     */
+    protected boolean dayofWeekFieldIsEvery() {
+        FieldExpression dayOfWeekFieldExpression = 
dayOfWeekField.getExpression();
+        return (dayOfWeekFieldExpression instanceof Every || 
dayOfWeekFieldExpression instanceof Always);
     }
 
-    this.cron = cron;
-    this.minField = cron.retrieve(CronFieldName.MINUTE);
-    this.hourField = cron.retrieve(CronFieldName.HOUR);
-    this.dayOfMonthField = cron.retrieve(CronFieldName.DAY_OF_MONTH);
-    this.dayOfWeekField = cron.retrieve(CronFieldName.DAY_OF_WEEK);
-    this.monthField = cron.retrieve(CronFieldName.MONTH);
-    this.yearField = cron.retrieve(CronFieldName.YEAR);
-  }
-
-  /**
-   * whether the minute field has a value
-   * @return if minute field has a value return true,else return false
-   */
-  protected boolean minFiledIsSetAll(){
-    FieldExpression minFieldExpression = minField.getExpression();
-    return (minFieldExpression instanceof Every || minFieldExpression 
instanceof Always
-            || minFieldExpression instanceof Between || minFieldExpression 
instanceof And
-            || minFieldExpression instanceof On);
-  }
-
-
-  /**
-   * whether the minute field has a value of every or always
-   * @return if minute field has a value of every or always return true,else 
return false
-   */
-  protected boolean minFiledIsEvery(){
-    FieldExpression minFieldExpression = minField.getExpression();
-    return (minFieldExpression instanceof Every || minFieldExpression 
instanceof Always);
-  }
-
-  /**
-   * whether the hour field has a value
-   * @return if hour field has a value return true,else return false
-   */
-  protected boolean hourFiledIsSetAll(){
-    FieldExpression hourFieldExpression = hourField.getExpression();
-    return (hourFieldExpression instanceof Every || hourFieldExpression 
instanceof Always
-            || hourFieldExpression instanceof Between || hourFieldExpression 
instanceof And
-            || hourFieldExpression instanceof On);
-  }
-
-  /**
-   * whether the hour field has a value of every or always
-   * @return if hour field has a value of every or always return true,else 
return false
-   */
-  protected boolean hourFiledIsEvery(){
-    FieldExpression hourFieldExpression = hourField.getExpression();
-    return (hourFieldExpression instanceof Every || hourFieldExpression 
instanceof Always);
-  }
-
-  /**
-   * whether the day Of month field has a value
-   * @return if day Of month field has a value return true,else return false
-   */
-  protected boolean dayOfMonthFieldIsSetAll(){
-    return (dayOfMonthField.getExpression() instanceof Every || 
dayOfMonthField.getExpression() instanceof Always
-            || dayOfMonthField.getExpression() instanceof Between || 
dayOfMonthField.getExpression() instanceof And
-            || dayOfMonthField.getExpression() instanceof On);
-  }
-
-
-  /**
-   * whether the day Of Month field has a value of every or always
-   * @return if day Of Month field has a value of every or always return 
true,else return false
-   */
-  protected boolean dayOfMonthFieldIsEvery(){
-    return (dayOfMonthField.getExpression() instanceof Every || 
dayOfMonthField.getExpression() instanceof Always);
-  }
-
-  /**
-   * whether month field has a value
-   * @return if month field has a value return true,else return false
-   */
-  protected boolean monthFieldIsSetAll(){
-    FieldExpression monthFieldExpression = monthField.getExpression();
-    return (monthFieldExpression instanceof Every || monthFieldExpression 
instanceof Always
-            || monthFieldExpression instanceof Between || monthFieldExpression 
instanceof And
-            || monthFieldExpression instanceof On);
-  }
-
-  /**
-   * whether the month field has a value of every or always
-   * @return if  month field has a value of every or always return true,else 
return false
-   */
-  protected boolean monthFieldIsEvery(){
-    FieldExpression monthFieldExpression = monthField.getExpression();
-    return (monthFieldExpression instanceof Every || monthFieldExpression 
instanceof Always);
-  }
-
-  /**
-   * whether the day Of week field has a value
-   * @return if day Of week field has a value return true,else return false
-   */
-  protected boolean dayofWeekFieldIsSetAll(){
-    FieldExpression dayOfWeekFieldExpression = dayOfWeekField.getExpression();
-    return (dayOfWeekFieldExpression instanceof Every || 
dayOfWeekFieldExpression instanceof Always
-            || dayOfWeekFieldExpression instanceof Between || 
dayOfWeekFieldExpression instanceof And
-            || dayOfWeekFieldExpression instanceof On);
-  }
-
-  /**
-   * whether the day Of week field has a value of every or always
-   * @return if day Of week field has a value of every or always return 
true,else return false
-   */
-  protected boolean dayofWeekFieldIsEvery(){
-    FieldExpression dayOfWeekFieldExpression = dayOfWeekField.getExpression();
-    return (dayOfWeekFieldExpression instanceof Every || 
dayOfWeekFieldExpression instanceof Always);
-  }
-
-  /**
-   * get cycle enum
-   * @return CycleEnum
-   */
-  protected abstract CycleEnum getCycle();
-
-  /**
-   * get mini level cycle enum
-   * @return CycleEnum
-   */
-  protected abstract CycleEnum getMiniCycle();
+    /**
+     * get cycle enum
+     *
+     * @return CycleEnum
+     */
+    protected abstract CycleEnum getCycle();
+
+    /**
+     * get mini level cycle enum
+     *
+     * @return CycleEnum
+     */
+    protected abstract CycleEnum getMiniCycle();
 }
diff --git 
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/AbstractZKClient.java
 
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/AbstractZKClient.java
index 8a7d891..37d8f10 100644
--- 
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/AbstractZKClient.java
+++ 
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/AbstractZKClient.java
@@ -14,322 +14,329 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.dolphinscheduler.service.zk;
 
-import org.apache.curator.framework.recipes.locks.InterProcessMutex;
+import static org.apache.dolphinscheduler.common.Constants.ADD_ZK_OP;
+import static org.apache.dolphinscheduler.common.Constants.COLON;
+import static org.apache.dolphinscheduler.common.Constants.DELETE_ZK_OP;
+import static org.apache.dolphinscheduler.common.Constants.DIVISION_STRING;
+import static org.apache.dolphinscheduler.common.Constants.MASTER_PREFIX;
+import static org.apache.dolphinscheduler.common.Constants.SINGLE_SLASH;
+import static org.apache.dolphinscheduler.common.Constants.UNDERLINE;
+import static org.apache.dolphinscheduler.common.Constants.WORKER_PREFIX;
+
 import org.apache.dolphinscheduler.common.Constants;
 import org.apache.dolphinscheduler.common.enums.ZKNodeType;
 import org.apache.dolphinscheduler.common.model.Server;
 import org.apache.dolphinscheduler.common.utils.ResInfo;
 import org.apache.dolphinscheduler.common.utils.StringUtils;
+
+import org.apache.curator.framework.recipes.locks.InterProcessMutex;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.stereotype.Component;
 
-import java.util.*;
-
-import static org.apache.dolphinscheduler.common.Constants.*;
-
 /**
  * abstract zookeeper client
  */
 @Component
 public abstract class AbstractZKClient extends ZookeeperCachedOperator {
 
-       private static final Logger logger = 
LoggerFactory.getLogger(AbstractZKClient.class);
-
-
-       /**
-        *  remove dead server by host
-        * @param host host
-        * @param serverType serverType
-        * @throws Exception
-        */
-       public void removeDeadServerByHost(String host, String serverType) 
throws Exception {
-               List<String> deadServers = 
super.getChildrenKeys(getDeadZNodeParentPath());
-               for(String serverPath : deadServers){
-                       if(serverPath.startsWith(serverType+UNDERLINE+host)){
-                               String server = getDeadZNodeParentPath() + 
SINGLE_SLASH + serverPath;
-                               super.remove(server);
-                               logger.info("{} server {} deleted from zk dead 
server path success" , serverType , host);
-                       }
-               }
-       }
-
-
-       /**
-        * opType(add): if find dead server , then add to zk deadServerPath
-        * opType(delete): delete path from zk
-        *
-        * @param zNode                   node path
-        * @param zkNodeType      master or worker
-        * @param opType                  delete or add
-        * @throws Exception errors
-        */
-       public void handleDeadServer(String zNode, ZKNodeType zkNodeType, 
String opType) throws Exception {
-               String host = getHostByEventDataPath(zNode);
-               String type = (zkNodeType == ZKNodeType.MASTER) ? MASTER_PREFIX 
: WORKER_PREFIX;
-
-               //check server restart, if restart , dead server path in zk 
should be delete
-               if(opType.equals(DELETE_ZK_OP)){
-                       removeDeadServerByHost(host, type);
-
-               }else if(opType.equals(ADD_ZK_OP)){
-                       String deadServerPath = getDeadZNodeParentPath() + 
SINGLE_SLASH + type + UNDERLINE + host;
-                       if(!super.isExisted(deadServerPath)){
-                               //add dead server info to zk dead server path : 
/dead-servers/
-
-                               super.persist(deadServerPath,(type + UNDERLINE 
+ host));
-
-                               logger.info("{} server dead , and {} added to 
zk dead server path success" ,
-                                               zkNodeType.toString(), zNode);
-                       }
-               }
-
-       }
-
-       /**
-        * get active master num
-        * @return active master number
-        */
-       public int getActiveMasterNum(){
-               List<String> childrenList = new ArrayList<>();
-               try {
-                       // read master node parent path from conf
-                       
if(super.isExisted(getZNodeParentPath(ZKNodeType.MASTER))){
-                               childrenList = 
super.getChildrenKeys(getZNodeParentPath(ZKNodeType.MASTER));
-                       }
-               } catch (Exception e) {
-                       logger.error("getActiveMasterNum error",e);
-               }
-               return childrenList.size();
-       }
-
-       /**
-        *
-        * @return zookeeper quorum
-        */
-       public String getZookeeperQuorum(){
-               return getZookeeperConfig().getServerList();
-       }
-
-       /**
-        * get server list.
-        * @param zkNodeType zookeeper node type
-        * @return server list
-        */
-       public List<Server> getServersList(ZKNodeType zkNodeType){
-               Map<String, String> masterMap = getServerMaps(zkNodeType);
-               String parentPath = getZNodeParentPath(zkNodeType);
-
-               List<Server> masterServers = new ArrayList<>();
-               for (Map.Entry<String, String> entry : masterMap.entrySet()) {
-                       Server masterServer = 
ResInfo.parseHeartbeatForZKInfo(entry.getValue());
-                       if(masterServer == null){
-                               continue;
-                       }
-                       String key = entry.getKey();
-                       masterServer.setZkDirectory(parentPath + "/"+ key);
-                       //set host and port
-                       String[] hostAndPort=key.split(COLON);
-                       String[] hosts=hostAndPort[0].split(DIVISION_STRING);
-                       // fetch the last one
-                       masterServer.setHost(hosts[hosts.length-1]);
-                       masterServer.setPort(Integer.parseInt(hostAndPort[1]));
-                       masterServers.add(masterServer);
-               }
-               return masterServers;
-       }
-
-       /**
-        * get master server list map.
-        * @param zkNodeType zookeeper node type
-        * @return result : {host : resource info}
-        */
-       public Map<String, String> getServerMaps(ZKNodeType zkNodeType){
-
-               Map<String, String> masterMap = new HashMap<>();
-               try {
-                       String path =  getZNodeParentPath(zkNodeType);
-                       List<String> serverList  = super.getChildrenKeys(path);
-                       if(zkNodeType == ZKNodeType.WORKER){
-                           List<String> workerList = new ArrayList<>();
-                           for(String group : serverList){
-                               List<String> groupServers = 
super.getChildrenKeys(path + Constants.SLASH + group);
-                               for(String groupServer : groupServers){
-                                       workerList.add(group + Constants.SLASH 
+ groupServer);
-                                       }
-                               }
-                               serverList = workerList;
-                       }
-                       for(String server : serverList){
-                               masterMap.putIfAbsent(server, super.get(path + 
Constants.SLASH + server));
-                       }
-               } catch (Exception e) {
-                       logger.error("get server list failed", e);
-               }
-
-               return masterMap;
-       }
-
-       /**
-        * check the zookeeper node already exists
-        * @param host host
-        * @param zkNodeType zookeeper node type
-        * @return true if exists
-        */
-       public boolean checkZKNodeExists(String host, ZKNodeType zkNodeType) {
-               String path = getZNodeParentPath(zkNodeType);
-               if(StringUtils.isEmpty(path)){
-                       logger.error("check zk node exists error, host:{}, zk 
node type:{}",
-                                       host, zkNodeType.toString());
-                       return false;
-               }
-               Map<String, String> serverMaps = getServerMaps(zkNodeType);
-               for(String hostKey : serverMaps.keySet()){
-                       if(hostKey.contains(host)){
-                               return true;
-                       }
-               }
-               return false;
-       }
-
-       /**
-        *
-        * @return get worker node parent path
-        */
-       protected String getWorkerZNodeParentPath(){
-               return getZookeeperConfig().getDsRoot() + 
Constants.ZOOKEEPER_DOLPHINSCHEDULER_WORKERS;
-       }
-
-       /**
-        *
-        * @return get master node parent path
-        */
-       protected String getMasterZNodeParentPath(){
-               return getZookeeperConfig().getDsRoot() + 
Constants.ZOOKEEPER_DOLPHINSCHEDULER_MASTERS;
-       }
-
-       /**
-        *
-        * @return get master lock path
-        */
-       public String getMasterLockPath(){
-               return getZookeeperConfig().getDsRoot() + 
Constants.ZOOKEEPER_DOLPHINSCHEDULER_LOCK_MASTERS;
-       }
-
-       /**
-        *
-        * @param zkNodeType zookeeper node type
-        * @return get zookeeper node parent path
-        */
-       public String getZNodeParentPath(ZKNodeType zkNodeType) {
-               String path = "";
-               switch (zkNodeType){
-                       case MASTER:
-                               return getMasterZNodeParentPath();
-                       case WORKER:
-                               return getWorkerZNodeParentPath();
-                       case DEAD_SERVER:
-                               return getDeadZNodeParentPath();
-                       default:
-                               break;
-               }
-               return path;
-       }
-
-       /**
-        *
-        * @return get dead server node parent path
-        */
-       protected String getDeadZNodeParentPath(){
-               return getZookeeperConfig().getDsRoot() + 
Constants.ZOOKEEPER_DOLPHINSCHEDULER_DEAD_SERVERS;
-       }
-
-       /**
-        *
-        * @return get master start up lock path
-        */
-       public String getMasterStartUpLockPath(){
-               return getZookeeperConfig().getDsRoot() + 
Constants.ZOOKEEPER_DOLPHINSCHEDULER_LOCK_FAILOVER_STARTUP_MASTERS;
-       }
-
-       /**
-        *
-        * @return get master failover lock path
-        */
-       public String getMasterFailoverLockPath(){
-               return getZookeeperConfig().getDsRoot() + 
Constants.ZOOKEEPER_DOLPHINSCHEDULER_LOCK_FAILOVER_MASTERS;
-       }
-
-       /**
-        *
-        * @return get worker failover lock path
-        */
-       public String getWorkerFailoverLockPath(){
-               return getZookeeperConfig().getDsRoot() + 
Constants.ZOOKEEPER_DOLPHINSCHEDULER_LOCK_FAILOVER_WORKERS;
-       }
-
-       /**
-        * release mutex
-        * @param mutex mutex
-        */
-       public void releaseMutex(InterProcessMutex mutex) {
-               if (mutex != null){
-                       try {
-                               mutex.release();
-                       } catch (Exception e) {
-                               if("instance must be started before calling 
this method".equals(e.getMessage())){
-                                       logger.warn("lock release");
-                               }else{
-                                       logger.error("lock release failed",e);
-                               }
-
-                       }
-               }
-       }
-
-       /**
-        *  init system znode
-        */
-       protected void initSystemZNode(){
-               try {
-                       persist(getMasterZNodeParentPath(), "");
-                       persist(getWorkerZNodeParentPath(), "");
-                       persist(getDeadZNodeParentPath(), "");
-
-                       logger.info("initialize server nodes success.");
-               } catch (Exception e) {
-                       logger.error("init system znode failed",e);
-               }
-       }
-
-       /**
-        *  get host ip, string format: masterParentPath/ip
-        * @param path path
-        * @return host ip, string format: masterParentPath/ip
-        */
-       protected String getHostByEventDataPath(String path) {
-               if(StringUtils.isEmpty(path)){
-                       logger.error("empty path!");
-                       return "";
-               }
-               String[] pathArray = path.split(SINGLE_SLASH);
-               if(pathArray.length < 1){
-                       logger.error("parse ip error: {}", path);
-                       return "";
-               }
-               return pathArray[pathArray.length - 1];
-
-       }
-
-       @Override
-       public String toString() {
-               return "AbstractZKClient{" +
-                               "zkClient=" + getZkClient() +
-                               ", deadServerZNodeParentPath='" + 
getZNodeParentPath(ZKNodeType.DEAD_SERVER) + '\'' +
-                               ", masterZNodeParentPath='" + 
getZNodeParentPath(ZKNodeType.MASTER) + '\'' +
-                               ", workerZNodeParentPath='" + 
getZNodeParentPath(ZKNodeType.WORKER) + '\'' +
-                               '}';
-       }
-}
\ No newline at end of file
+    private static final Logger logger = 
LoggerFactory.getLogger(AbstractZKClient.class);
+
+    /**
+     * remove dead server by host
+     *
+     * @param host host
+     * @param serverType serverType
+     */
+    public void removeDeadServerByHost(String host, String serverType) {
+        List<String> deadServers = 
super.getChildrenKeys(getDeadZNodeParentPath());
+        for (String serverPath : deadServers) {
+            if (serverPath.startsWith(serverType + UNDERLINE + host)) {
+                String server = getDeadZNodeParentPath() + SINGLE_SLASH + 
serverPath;
+                super.remove(server);
+                logger.info("{} server {} deleted from zk dead server path 
success", serverType, host);
+            }
+        }
+    }
+
+    /**
+     * opType(add): if find dead server , then add to zk deadServerPath
+     * opType(delete): delete path from zk
+     *
+     * @param zNode node path
+     * @param zkNodeType master or worker
+     * @param opType delete or add
+     */
+    public void handleDeadServer(String zNode, ZKNodeType zkNodeType, String 
opType) {
+        String host = getHostByEventDataPath(zNode);
+        String type = (zkNodeType == ZKNodeType.MASTER) ? MASTER_PREFIX : 
WORKER_PREFIX;
+
+        //check server restart, if restart , dead server path in zk should be 
delete
+        if (opType.equals(DELETE_ZK_OP)) {
+            removeDeadServerByHost(host, type);
+
+        } else if (opType.equals(ADD_ZK_OP)) {
+            String deadServerPath = getDeadZNodeParentPath() + SINGLE_SLASH + 
type + UNDERLINE + host;
+            if (!super.isExisted(deadServerPath)) {
+                //add dead server info to zk dead server path : /dead-servers/
+
+                super.persist(deadServerPath, (type + UNDERLINE + host));
+
+                logger.info("{} server dead , and {} added to zk dead server 
path success",
+                        zkNodeType, zNode);
+            }
+        }
+
+    }
+
+    /**
+     * get active master num
+     *
+     * @return active master number
+     */
+    public int getActiveMasterNum() {
+        List<String> childrenList = new ArrayList<>();
+        try {
+            // read master node parent path from conf
+            if (super.isExisted(getZNodeParentPath(ZKNodeType.MASTER))) {
+                childrenList = 
super.getChildrenKeys(getZNodeParentPath(ZKNodeType.MASTER));
+            }
+        } catch (Exception e) {
+            logger.error("getActiveMasterNum error", e);
+        }
+        return childrenList.size();
+    }
+
+    /**
+     * @return zookeeper quorum
+     */
+    public String getZookeeperQuorum() {
+        return getZookeeperConfig().getServerList();
+    }
+
+    /**
+     * get server list.
+     *
+     * @param zkNodeType zookeeper node type
+     * @return server list
+     */
+    public List<Server> getServersList(ZKNodeType zkNodeType) {
+        Map<String, String> masterMap = getServerMaps(zkNodeType);
+        String parentPath = getZNodeParentPath(zkNodeType);
+
+        List<Server> masterServers = new ArrayList<>();
+        for (Map.Entry<String, String> entry : masterMap.entrySet()) {
+            Server masterServer = 
ResInfo.parseHeartbeatForZKInfo(entry.getValue());
+            if (masterServer == null) {
+                continue;
+            }
+            String key = entry.getKey();
+            masterServer.setZkDirectory(parentPath + "/" + key);
+            //set host and port
+            String[] hostAndPort = key.split(COLON);
+            String[] hosts = hostAndPort[0].split(DIVISION_STRING);
+            // fetch the last one
+            masterServer.setHost(hosts[hosts.length - 1]);
+            masterServer.setPort(Integer.parseInt(hostAndPort[1]));
+            masterServers.add(masterServer);
+        }
+        return masterServers;
+    }
+
+    /**
+     * get master server list map.
+     *
+     * @param zkNodeType zookeeper node type
+     * @return result : {host : resource info}
+     */
+    public Map<String, String> getServerMaps(ZKNodeType zkNodeType) {
+
+        Map<String, String> masterMap = new HashMap<>();
+        try {
+            String path = getZNodeParentPath(zkNodeType);
+            List<String> serverList = super.getChildrenKeys(path);
+            if (zkNodeType == ZKNodeType.WORKER) {
+                List<String> workerList = new ArrayList<>();
+                for (String group : serverList) {
+                    List<String> groupServers = super.getChildrenKeys(path + 
Constants.SLASH + group);
+                    for (String groupServer : groupServers) {
+                        workerList.add(group + Constants.SLASH + groupServer);
+                    }
+                }
+                serverList = workerList;
+            }
+            for (String server : serverList) {
+                masterMap.putIfAbsent(server, super.get(path + Constants.SLASH 
+ server));
+            }
+        } catch (Exception e) {
+            logger.error("get server list failed", e);
+        }
+
+        return masterMap;
+    }
+
+    /**
+     * check the zookeeper node already exists
+     *
+     * @param host host
+     * @param zkNodeType zookeeper node type
+     * @return true if exists
+     */
+    public boolean checkZKNodeExists(String host, ZKNodeType zkNodeType) {
+        String path = getZNodeParentPath(zkNodeType);
+        if (StringUtils.isEmpty(path)) {
+            logger.error("check zk node exists error, host:{}, zk node 
type:{}",
+                    host, zkNodeType);
+            return false;
+        }
+        Map<String, String> serverMaps = getServerMaps(zkNodeType);
+        for (String hostKey : serverMaps.keySet()) {
+            if (hostKey.contains(host)) {
+                return true;
+            }
+        }
+        return false;
+    }
+
+    /**
+     * @return get worker node parent path
+     */
+    protected String getWorkerZNodeParentPath() {
+        return getZookeeperConfig().getDsRoot() + 
Constants.ZOOKEEPER_DOLPHINSCHEDULER_WORKERS;
+    }
+
+    /**
+     * @return get master node parent path
+     */
+    protected String getMasterZNodeParentPath() {
+        return getZookeeperConfig().getDsRoot() + 
Constants.ZOOKEEPER_DOLPHINSCHEDULER_MASTERS;
+    }
+
+    /**
+     * @return get master lock path
+     */
+    public String getMasterLockPath() {
+        return getZookeeperConfig().getDsRoot() + 
Constants.ZOOKEEPER_DOLPHINSCHEDULER_LOCK_MASTERS;
+    }
+
+    /**
+     * @param zkNodeType zookeeper node type
+     * @return get zookeeper node parent path
+     */
+    public String getZNodeParentPath(ZKNodeType zkNodeType) {
+        String path = "";
+        switch (zkNodeType) {
+            case MASTER:
+                return getMasterZNodeParentPath();
+            case WORKER:
+                return getWorkerZNodeParentPath();
+            case DEAD_SERVER:
+                return getDeadZNodeParentPath();
+            default:
+                break;
+        }
+        return path;
+    }
+
+    /**
+     * @return get dead server node parent path
+     */
+    protected String getDeadZNodeParentPath() {
+        return getZookeeperConfig().getDsRoot() + 
Constants.ZOOKEEPER_DOLPHINSCHEDULER_DEAD_SERVERS;
+    }
+
+    /**
+     * @return get master start up lock path
+     */
+    public String getMasterStartUpLockPath() {
+        return getZookeeperConfig().getDsRoot() + 
Constants.ZOOKEEPER_DOLPHINSCHEDULER_LOCK_FAILOVER_STARTUP_MASTERS;
+    }
+
+    /**
+     * @return get master failover lock path
+     */
+    public String getMasterFailoverLockPath() {
+        return getZookeeperConfig().getDsRoot() + 
Constants.ZOOKEEPER_DOLPHINSCHEDULER_LOCK_FAILOVER_MASTERS;
+    }
+
+    /**
+     * @return get worker failover lock path
+     */
+    public String getWorkerFailoverLockPath() {
+        return getZookeeperConfig().getDsRoot() + 
Constants.ZOOKEEPER_DOLPHINSCHEDULER_LOCK_FAILOVER_WORKERS;
+    }
+
+    /**
+     * release mutex
+     *
+     * @param mutex mutex
+     */
+    public void releaseMutex(InterProcessMutex mutex) {
+        if (mutex != null) {
+            try {
+                mutex.release();
+            } catch (Exception e) {
+                if ("instance must be started before calling this 
method".equals(e.getMessage())) {
+                    logger.warn("lock release");
+                } else {
+                    logger.error("lock release failed", e);
+                }
+
+            }
+        }
+    }
+
+    /**
+     * init system znode
+     */
+    protected void initSystemZNode() {
+        try {
+            persist(getMasterZNodeParentPath(), "");
+            persist(getWorkerZNodeParentPath(), "");
+            persist(getDeadZNodeParentPath(), "");
+
+            logger.info("initialize server nodes success.");
+        } catch (Exception e) {
+            logger.error("init system znode failed", e);
+        }
+    }
+
+    /**
+     * get host ip, string format: masterParentPath/ip
+     *
+     * @param path path
+     * @return host ip, string format: masterParentPath/ip
+     */
+    protected String getHostByEventDataPath(String path) {
+        if (StringUtils.isEmpty(path)) {
+            logger.error("empty path!");
+            return "";
+        }
+        String[] pathArray = path.split(SINGLE_SLASH);
+        if (pathArray.length < 1) {
+            logger.error("parse ip error: {}", path);
+            return "";
+        }
+        return pathArray[pathArray.length - 1];
+
+    }
+
+    @Override
+    public String toString() {
+        return "AbstractZKClient{"
+                + "zkClient=" + getZkClient()
+                + ", deadServerZNodeParentPath='" + 
getZNodeParentPath(ZKNodeType.DEAD_SERVER) + '\''
+                + ", masterZNodeParentPath='" + 
getZNodeParentPath(ZKNodeType.MASTER) + '\''
+                + ", workerZNodeParentPath='" + 
getZNodeParentPath(ZKNodeType.WORKER) + '\''
+                + '}';
+    }
+}
diff --git 
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/CuratorZookeeperClient.java
 
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/CuratorZookeeperClient.java
index 5a04c5a..e25a22f 100644
--- 
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/CuratorZookeeperClient.java
+++ 
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/CuratorZookeeperClient.java
@@ -14,9 +14,14 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.dolphinscheduler.service.zk;
 
-import org.apache.commons.lang.StringUtils;
+import static 
org.apache.dolphinscheduler.common.utils.Preconditions.checkNotNull;
+
+import org.apache.dolphinscheduler.common.utils.StringUtils;
+import org.apache.dolphinscheduler.service.exceptions.ServiceException;
+
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.CuratorFrameworkFactory;
 import org.apache.curator.framework.api.ACLProvider;
@@ -25,18 +30,16 @@ import org.apache.curator.retry.ExponentialBackoffRetry;
 import org.apache.zookeeper.ZooDefs;
 import org.apache.zookeeper.data.ACL;
 
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.InitializingBean;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
 
-import java.nio.charset.StandardCharsets;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-
-import static 
org.apache.dolphinscheduler.common.utils.Preconditions.checkNotNull;
-
 /**
  * Shared Curator zookeeper client
  */
@@ -49,7 +52,6 @@ public class CuratorZookeeperClient implements 
InitializingBean {
 
     private CuratorFramework zkClient;
 
-
     @Override
     public void afterPropertiesSet() throws Exception {
         this.zkClient = buildClient();
@@ -91,7 +93,7 @@ public class CuratorZookeeperClient implements 
InitializingBean {
             zkClient.blockUntilConnected(30, TimeUnit.SECONDS);
 
         } catch (final Exception ex) {
-            throw new RuntimeException(ex);
+            throw new ServiceException(ex);
         }
         return zkClient;
     }
@@ -123,4 +125,4 @@ public class CuratorZookeeperClient implements 
InitializingBean {
     public CuratorFramework getZkClient() {
         return zkClient;
     }
-}
\ No newline at end of file
+}
diff --git 
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/ZKServer.java
 
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/ZKServer.java
index c7a53eb..7ac23a3 100644
--- 
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/ZKServer.java
+++ 
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/ZKServer.java
@@ -14,19 +14,22 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.dolphinscheduler.service.zk;
 
 import org.apache.dolphinscheduler.common.utils.StringUtils;
+import org.apache.dolphinscheduler.service.exceptions.ServiceException;
+
 import org.apache.zookeeper.server.ZooKeeperServer;
 import org.apache.zookeeper.server.ZooKeeperServerMain;
 import org.apache.zookeeper.server.quorum.QuorumPeerConfig;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.io.IOException;
 import java.util.concurrent.atomic.AtomicBoolean;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * just speed experience version
@@ -51,10 +54,10 @@ public class ZKServer {
         ZKServer zkServer;
         if (args.length == 0) {
             zkServer = new ZKServer();
-        } else if (args.length == 1){
-            zkServer = new ZKServer(Integer.valueOf(args[0]), "");
+        } else if (args.length == 1) {
+            zkServer = new ZKServer(Integer.parseInt(args[0]), "");
         } else {
-            zkServer = new ZKServer(Integer.valueOf(args[0]), args[1]);
+            zkServer = new ZKServer(Integer.parseInt(args[0]), args[1]);
         }
         zkServer.registerHook();
         zkServer.start();
@@ -73,7 +76,7 @@ public class ZKServer {
     }
 
     private void registerHook() {
-        /**
+        /*
          *  register hooks, which are called before the process exits
          */
         Runtime.getRuntime().addShutdownHook(new Thread(this::stop));
@@ -90,7 +93,7 @@ public class ZKServer {
         }
     }
 
-    public boolean isStarted(){
+    public boolean isStarted() {
         return isStarted.get();
     }
 
@@ -119,19 +122,19 @@ public class ZKServer {
         if (file.exists()) {
             logger.warn("The path of zk server exists");
         }
-        logger.info("zk server starting, data dir path:{}" , zkDataDir);
-        startLocalZkServer(port, zkDataDir, 
ZooKeeperServer.DEFAULT_TICK_TIME,"60");
+        logger.info("zk server starting, data dir path:{}", zkDataDir);
+        startLocalZkServer(port, zkDataDir, ZooKeeperServer.DEFAULT_TICK_TIME, 
"60");
     }
 
     /**
      * Starts a local Zk instance
      *
-     * @param port        The port to listen on
+     * @param port The port to listen on
      * @param dataDirPath The path for the Zk data directory
-     * @param tickTime    zk tick time
-     * @param maxClientCnxns    zk max client connections
+     * @param tickTime zk tick time
+     * @param maxClientCnxns zk max client connections
      */
-    private void startLocalZkServer(final int port, final String 
dataDirPath,final int tickTime,String maxClientCnxns) {
+    private void startLocalZkServer(final int port, final String dataDirPath, 
final int tickTime, String maxClientCnxns) {
         if (isStarted.compareAndSet(false, true)) {
             zooKeeperServerMain = new PublicZooKeeperServerMain();
             logger.info("Zookeeper data path : {} ", dataDirPath);
@@ -144,8 +147,7 @@ public class ZKServer {
 
                 zooKeeperServerMain.initializeAndRun(args);
             } catch (QuorumPeerConfig.ConfigException | IOException e) {
-                logger.warn("Caught exception while starting ZK", e);
-                throw new RuntimeException(e);
+                throw new ServiceException("Caught exception while starting 
ZK", e);
             }
         }
     }
@@ -159,7 +161,7 @@ public class ZKServer {
             logger.info("zk server stopped");
 
         } catch (Exception e) {
-            logger.error("Failed to stop ZK ",e);
+            logger.error("Failed to stop ZK ", e);
         }
     }
 
@@ -180,8 +182,7 @@ public class ZKServer {
                     org.apache.commons.io.FileUtils.deleteDirectory(new 
File(dataDir));
                 }
             } catch (Exception e) {
-                logger.warn("Caught exception while stopping ZK server", e);
-                throw new RuntimeException(e);
+                throw new ServiceException("Caught exception while starting 
ZK", e);
             }
         }
     }
diff --git 
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/ZookeeperCachedOperator.java
 
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/ZookeeperCachedOperator.java
index 6dfce79..88c339b 100644
--- 
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/ZookeeperCachedOperator.java
+++ 
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/ZookeeperCachedOperator.java
@@ -14,21 +14,24 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.dolphinscheduler.service.zk;
 
 import org.apache.dolphinscheduler.common.thread.ThreadUtils;
+import org.apache.dolphinscheduler.service.exceptions.ServiceException;
 
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.recipes.cache.ChildData;
 import org.apache.curator.framework.recipes.cache.TreeCache;
 import org.apache.curator.framework.recipes.cache.TreeCacheEvent;
 import org.apache.curator.framework.recipes.cache.TreeCacheListener;
+
+import java.nio.charset.StandardCharsets;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.stereotype.Component;
 
-import java.nio.charset.StandardCharsets;
-
 @Component
 public class ZookeeperCachedOperator extends ZookeeperOperator {
 
@@ -36,6 +39,7 @@ public class ZookeeperCachedOperator extends 
ZookeeperOperator {
 
 
     private TreeCache treeCache;
+
     /**
      * register a unified listener of /${dsRoot},
      */
@@ -59,14 +63,16 @@ public class ZookeeperCachedOperator extends 
ZookeeperOperator {
             treeCache.start();
         } catch (Exception e) {
             logger.error("add listener to zk path: {} failed", 
getZookeeperConfig().getDsRoot());
-            throw new RuntimeException(e);
+            throw new ServiceException(e);
         }
     }
 
     //for sub class
-    protected void dataChanged(final CuratorFramework client, final 
TreeCacheEvent event, final String path){}
+    protected void dataChanged(final CuratorFramework client, final 
TreeCacheEvent event, final String path) {
+        // Used by sub class
+    }
 
-    public String getFromCache(final String cachePath, final String key) {
+    public String getFromCache(final String key) {
         ChildData resultInCache = treeCache.getCurrentData(key);
         if (null != resultInCache) {
             return null == resultInCache.getData() ? null : new 
String(resultInCache.getData(), StandardCharsets.UTF_8);
@@ -74,11 +80,11 @@ public class ZookeeperCachedOperator extends 
ZookeeperOperator {
         return null;
     }
 
-    public TreeCache getTreeCache(final String cachePath) {
+    public TreeCache getTreeCache() {
         return treeCache;
     }
 
-    public void addListener(TreeCacheListener listener){
+    public void addListener(TreeCacheListener listener) {
         this.treeCache.getListenable().addListener(listener);
     }
 
diff --git 
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/ZookeeperOperator.java
 
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/ZookeeperOperator.java
index e7b049f..8a21983 100644
--- 
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/ZookeeperOperator.java
+++ 
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/ZookeeperOperator.java
@@ -14,13 +14,17 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.dolphinscheduler.service.zk;
 
-import org.apache.commons.lang.StringUtils;
+import static 
org.apache.dolphinscheduler.common.utils.Preconditions.checkNotNull;
+
+import org.apache.dolphinscheduler.common.utils.StringUtils;
+import org.apache.dolphinscheduler.service.exceptions.ServiceException;
+
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.CuratorFrameworkFactory;
 import org.apache.curator.framework.api.ACLProvider;
-import org.apache.curator.framework.api.transaction.CuratorOp;
 import org.apache.curator.framework.state.ConnectionState;
 import org.apache.curator.retry.ExponentialBackoffRetry;
 import org.apache.curator.utils.CloseableUtils;
@@ -29,18 +33,16 @@ import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.ZooDefs;
 import org.apache.zookeeper.data.ACL;
 import org.apache.zookeeper.data.Stat;
+
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.InitializingBean;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
 
-import java.nio.charset.StandardCharsets;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-
-import static 
org.apache.dolphinscheduler.common.utils.Preconditions.checkNotNull;
-
 /**
  * zk base operator
  */
@@ -64,19 +66,23 @@ public class ZookeeperOperator implements InitializingBean {
     /**
      * this method is for sub class,
      */
-    protected void registerListener(){}
+    protected void registerListener() {
+        // Used by sub class
+    }
 
-    protected void treeCacheStart(){}
+    protected void treeCacheStart() {
+        // Used by sub class
+    }
 
     public void initStateLister() {
         checkNotNull(zkClient);
 
         zkClient.getConnectionStateListenable().addListener((client, newState) 
-> {
-            if(newState == ConnectionState.LOST){
+            if (newState == ConnectionState.LOST) {
                 logger.error("connection lost from zookeeper");
-            } else if(newState == ConnectionState.RECONNECTED){
+            } else if (newState == ConnectionState.RECONNECTED) {
                 logger.info("reconnected to zookeeper");
-            } else if(newState == ConnectionState.SUSPENDED){
+            } else if (newState == ConnectionState.SUSPENDED) {
                 logger.warn("connection SUSPENDED to zookeeper");
             }
         });
@@ -85,7 +91,8 @@ public class ZookeeperOperator implements InitializingBean {
     private CuratorFramework buildClient() {
         logger.info("zookeeper registry center init, server lists is: {}.", 
zookeeperConfig.getServerList());
 
-        CuratorFrameworkFactory.Builder builder = 
CuratorFrameworkFactory.builder().ensembleProvider(new 
DefaultEnsembleProvider(checkNotNull(zookeeperConfig.getServerList(),"zookeeper 
quorum can't be null")))
+        CuratorFrameworkFactory.Builder builder = 
CuratorFrameworkFactory.builder().ensembleProvider(new 
DefaultEnsembleProvider(checkNotNull(zookeeperConfig.getServerList(),
+                "zookeeper quorum can't be null")))
                 .retryPolicy(new 
ExponentialBackoffRetry(zookeeperConfig.getBaseSleepTimeMs(), 
zookeeperConfig.getMaxRetries(), zookeeperConfig.getMaxSleepMs()));
 
         //these has default value
@@ -114,7 +121,7 @@ public class ZookeeperOperator implements InitializingBean {
         try {
             zkClient.blockUntilConnected();
         } catch (final Exception ex) {
-            throw new RuntimeException(ex);
+            throw new ServiceException(ex);
         }
         return zkClient;
     }
@@ -138,12 +145,12 @@ public class ZookeeperOperator implements 
InitializingBean {
             throw new IllegalStateException(ex);
         } catch (Exception ex) {
             logger.error("getChildrenKeys key : {}", key, ex);
-            throw new RuntimeException(ex);
+            throw new ServiceException(ex);
         }
     }
 
-    public boolean hasChildren(final String key){
-        Stat stat ;
+    public boolean hasChildren(final String key) {
+        Stat stat;
         try {
             stat = zkClient.checkExists().forPath(key);
             return stat.getNumChildren() >= 1;
@@ -241,4 +248,4 @@ public class ZookeeperOperator implements InitializingBean {
     public void close() {
         CloseableUtils.closeQuietly(zkClient);
     }
-}
\ No newline at end of file
+}

Reply via email to