This is an automated email from the ASF dual-hosted git repository.
lidongdai pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new e083e28 [Improvement][service] Refactor service module to fix code
smell (#4513)
e083e28 is described below
commit e083e28720a5cd5f4b2c1f8916c26a6089e7fa4e
Author: Segun Ogundipe <[email protected]>
AuthorDate: Fri Jan 29 12:44:33 2021 +0100
[Improvement][service] Refactor service module to fix code smell (#4513)
* chore: Refactor code to fix code smell in service module
* chore: Add licence header to new files
* chore: Fix comment from code review
* chore[service]: Reduce the number of custom runtime exception to one
---
.../dolphinscheduler/common/utils/StringUtils.java | 4 +
.../service/bean/SpringApplicationContext.java | 5 +-
.../ServiceException.java} | 43 +-
.../service/permission/PermissionCheck.java | 48 +-
.../service/process/ProcessService.java | 307 +++++------
.../service/quartz/ProcessScheduleJob.java | 15 +-
.../service/quartz/QuartzExecutors.java | 614 +++++++++++----------
.../service/quartz/cron/AbstractCycle.java | 302 +++++-----
.../service/zk/AbstractZKClient.java | 611 ++++++++++----------
.../service/zk/CuratorZookeeperClient.java | 22 +-
.../dolphinscheduler/service/zk/ZKServer.java | 37 +-
.../service/zk/ZookeeperCachedOperator.java | 20 +-
.../service/zk/ZookeeperOperator.java | 45 +-
13 files changed, 1071 insertions(+), 1002 deletions(-)
diff --git
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/StringUtils.java
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/StringUtils.java
index 6e32d12..362c613 100644
---
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/StringUtils.java
+++
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/StringUtils.java
@@ -61,4 +61,8 @@ public class StringUtils {
public static String trim(String str) {
return str == null ? null : str.trim();
}
+
+ public static boolean equalsIgnoreCase(String str1, String str2) {
+ return str1 == null ? str2 == null : str1.equalsIgnoreCase(str2);
+ }
}
diff --git
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/bean/SpringApplicationContext.java
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/bean/SpringApplicationContext.java
index ddf1fec..484b837 100644
---
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/bean/SpringApplicationContext.java
+++
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/bean/SpringApplicationContext.java
@@ -14,6 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.dolphinscheduler.service.bean;
import org.springframework.beans.BeansException;
@@ -31,9 +32,7 @@ public class SpringApplicationContext implements
ApplicationContextAware {
SpringApplicationContext.applicationContext = applicationContext;
}
- public static <T> T getBean(Class<T> requiredType){
+ public static <T> T getBean(Class<T> requiredType) {
return applicationContext.getBean(requiredType);
}
-
-
}
diff --git
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/bean/SpringApplicationContext.java
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/exceptions/ServiceException.java
similarity index 51%
copy from
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/bean/SpringApplicationContext.java
copy to
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/exceptions/ServiceException.java
index ddf1fec..4465970 100644
---
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/bean/SpringApplicationContext.java
+++
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/exceptions/ServiceException.java
@@ -14,26 +14,39 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.dolphinscheduler.service.bean;
-import org.springframework.beans.BeansException;
-import org.springframework.context.ApplicationContext;
-import org.springframework.context.ApplicationContextAware;
-import org.springframework.stereotype.Component;
+package org.apache.dolphinscheduler.service.exceptions;
-@Component
-public class SpringApplicationContext implements ApplicationContextAware {
-
- private static ApplicationContext applicationContext;
+/**
+ * Custom ZKServerException exception
+ */
+public class ServiceException extends RuntimeException {
- @Override
- public void setApplicationContext(ApplicationContext applicationContext)
throws BeansException {
- SpringApplicationContext.applicationContext = applicationContext;
+ /**
+ * Construct a new runtime exception with the error message
+ *
+ * @param errMsg Error message
+ */
+ public ServiceException(String errMsg) {
+ super(errMsg);
}
- public static <T> T getBean(Class<T> requiredType){
- return applicationContext.getBean(requiredType);
+ /**
+ * Construct a new runtime exception with the cause
+ *
+ * @param cause cause
+ */
+ public ServiceException(Throwable cause) {
+ super(cause);
}
-
+ /**
+ * Construct a new runtime exception with the detail message and cause
+ *
+ * @param errMsg message
+ * @param cause cause
+ */
+ public ServiceException(String errMsg, Throwable cause) {
+ super(errMsg, cause);
+ }
}
diff --git
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/permission/PermissionCheck.java
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/permission/PermissionCheck.java
index 1a9295b..a8f73f0 100644
---
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/permission/PermissionCheck.java
+++
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/permission/PermissionCheck.java
@@ -14,6 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.dolphinscheduler.service.permission;
import org.apache.dolphinscheduler.common.enums.AuthorizationType;
@@ -21,11 +22,13 @@ import org.apache.dolphinscheduler.common.enums.UserType;
import org.apache.dolphinscheduler.common.process.ResourceInfo;
import org.apache.dolphinscheduler.common.utils.CollectionUtils;
import org.apache.dolphinscheduler.dao.entity.User;
+import org.apache.dolphinscheduler.service.exceptions.ServiceException;
import org.apache.dolphinscheduler.service.process.ProcessService;
-import org.slf4j.Logger;
import java.util.List;
+import org.slf4j.Logger;
+
public class PermissionCheck<T> {
/**
* logger
@@ -58,8 +61,9 @@ public class PermissionCheck<T> {
/**
* permission check
+ *
* @param authorizationType authorization type
- * @param processService process dao
+ * @param processService process dao
*/
public PermissionCheck(AuthorizationType authorizationType, ProcessService
processService) {
this.authorizationType = authorizationType;
@@ -68,10 +72,6 @@ public class PermissionCheck<T> {
/**
* permission check
- * @param authorizationType
- * @param processService
- * @param needChecks
- * @param userId
*/
public PermissionCheck(AuthorizationType authorizationType, ProcessService
processService, T[] needChecks, int userId) {
this.authorizationType = authorizationType;
@@ -82,11 +82,6 @@ public class PermissionCheck<T> {
/**
* permission check
- * @param authorizationType
- * @param processService
- * @param needChecks
- * @param userId
- * @param logger
*/
public PermissionCheck(AuthorizationType authorizationType, ProcessService
processService, T[] needChecks, int userId, Logger logger) {
this.authorizationType = authorizationType;
@@ -98,13 +93,8 @@ public class PermissionCheck<T> {
/**
* permission check
- * @param logger
- * @param authorizationType
- * @param processService
- * @param resourceList
- * @param userId
*/
- public PermissionCheck(AuthorizationType authorizationType, ProcessService
processService, List<ResourceInfo> resourceList, int userId,Logger logger) {
+ public PermissionCheck(AuthorizationType authorizationType, ProcessService
processService, List<ResourceInfo> resourceList, int userId, Logger logger) {
this.authorizationType = authorizationType;
this.processService = processService;
this.resourceList = resourceList;
@@ -154,9 +144,10 @@ public class PermissionCheck<T> {
/**
* has permission
+ *
* @return true if has permission
*/
- public boolean hasPermission(){
+ public boolean hasPermission() {
try {
checkPermission();
return true;
@@ -167,23 +158,24 @@ public class PermissionCheck<T> {
/**
* check permission
- * @throws Exception exception
+ *
+ * @throws ServiceException exception
*/
- public void checkPermission() throws Exception{
- if(this.needChecks.length > 0){
+ public void checkPermission() throws ServiceException {
+ if (this.needChecks.length > 0) {
// get user type in order to judge whether the user is admin
User user = processService.getUserById(userId);
if (user == null) {
- logger.error("user id {} didn't exist",userId);
- throw new RuntimeException(String.format("user %s didn't
exist",userId));
+ logger.error("user id {} doesn't exist", userId);
+ throw new ServiceException(String.format("user %s doesn't
exist", userId));
}
- if (user.getUserType() != UserType.ADMIN_USER){
- List<T> unauthorizedList =
processService.listUnauthorized(userId,needChecks,authorizationType);
+ if (user.getUserType() != UserType.ADMIN_USER) {
+ List<T> unauthorizedList =
processService.listUnauthorized(userId, needChecks, authorizationType);
// if exist unauthorized resource
- if(CollectionUtils.isNotEmpty(unauthorizedList)){
- logger.error("user {} didn't has permission of {}: {}",
user.getUserName(), authorizationType.getDescp(),unauthorizedList);
- throw new RuntimeException(String.format("user %s didn't
has permission of %s %s", user.getUserName(), authorizationType.getDescp(),
unauthorizedList.get(0)));
+ if (CollectionUtils.isNotEmpty(unauthorizedList)) {
+ logger.error("user {} doesn't have permission of {}: {}",
user.getUserName(), authorizationType.getDescp(), unauthorizedList);
+ throw new ServiceException(String.format("user %s doesn't
have permission of %s %s", user.getUserName(), authorizationType.getDescp(),
unauthorizedList.get(0)));
}
}
}
diff --git
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
index cfe649d..7c30509 100644
---
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
+++
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
@@ -84,6 +84,7 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Calendar;
import java.util.Date;
+import java.util.EnumMap;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -110,11 +111,11 @@ public class ProcessService {
private final Logger logger = LoggerFactory.getLogger(getClass());
- private final int[] stateArray = new int[]
{ExecutionStatus.SUBMITTED_SUCCESS.ordinal(),
- ExecutionStatus.RUNNING_EXECUTION.ordinal(),
- ExecutionStatus.DELAY_EXECUTION.ordinal(),
- ExecutionStatus.READY_PAUSE.ordinal(),
- ExecutionStatus.READY_STOP.ordinal()};
+ private final int[] stateArray = new
int[]{ExecutionStatus.SUBMITTED_SUCCESS.ordinal(),
+ ExecutionStatus.RUNNING_EXECUTION.ordinal(),
+ ExecutionStatus.DELAY_EXECUTION.ordinal(),
+ ExecutionStatus.READY_PAUSE.ordinal(),
+ ExecutionStatus.READY_STOP.ordinal()};
@Autowired
private UserMapper userMapper;
@@ -158,16 +159,16 @@ public class ProcessService {
/**
* handle Command (construct ProcessInstance from Command) , wrapped in
transaction
*
- * @param logger logger
- * @param host host
+ * @param logger logger
+ * @param host host
* @param validThreadNum validThreadNum
- * @param command found command
+ * @param command found command
* @return process instance
*/
@Transactional(rollbackFor = Exception.class)
public ProcessInstance handleCommand(Logger logger, String host, int
validThreadNum, Command command) {
ProcessInstance processInstance = constructProcessInstance(command,
host);
- //cannot construct process instance, return null;
+ // cannot construct process instance, return null
if (processInstance == null) {
logger.error("scan command, command parameter is error: {}",
command);
moveToErrorCommand(command, "process instance is null");
@@ -201,7 +202,7 @@ public class ProcessService {
/**
* set process waiting thread
*
- * @param command command
+ * @param command command
* @param processInstance processInstance
* @return process instance
*/
@@ -219,7 +220,7 @@ public class ProcessService {
/**
* check thread num
*
- * @param command command
+ * @param command command
* @param validThreadNum validThreadNum
* @return if thread is enough
*/
@@ -259,7 +260,7 @@ public class ProcessService {
*/
public Boolean verifyIsNeedCreateCommand(Command command) {
Boolean isNeedCreate = true;
- Map<CommandType, Integer> cmdTypeMap = new HashMap<CommandType,
Integer>();
+ EnumMap<CommandType, Integer> cmdTypeMap = new
EnumMap<>(CommandType.class);
cmdTypeMap.put(CommandType.REPEAT_RUNNING, 1);
cmdTypeMap.put(CommandType.RECOVER_SUSPENDED_PROCESS, 1);
cmdTypeMap.put(CommandType.START_FAILURE_TASK_PROCESS, 1);
@@ -296,9 +297,6 @@ public class ProcessService {
/**
* get task node list by definitionId
- *
- * @param defineId
- * @return
*/
public List<TaskNode> getTaskNodeListByDefinitionId(Integer defineId) {
ProcessDefinition processDefinition =
processDefineMapper.selectById(defineId);
@@ -425,7 +423,7 @@ public class ProcessService {
* recursive query sub process definition id by parent id.
*
* @param parentId parentId
- * @param ids ids
+ * @param ids ids
*/
public void recurseFindSubProcessId(int parentId, List<Integer> ids) {
ProcessDefinition processDefinition =
processDefineMapper.selectById(parentId);
@@ -435,7 +433,7 @@ public class ProcessService {
List<TaskNode> taskNodeList = processData.getTasks();
- if (taskNodeList != null && taskNodeList.size() > 0) {
+ if (taskNodeList != null && !taskNodeList.isEmpty()) {
for (TaskNode taskNode : taskNodeList) {
String parameter = taskNode.getParams();
@@ -456,7 +454,7 @@ public class ProcessService {
* create recovery waiting thread command and delete origin command at
the same time.
* if the recovery command is exists, only update the field update_time
*
- * @param originCommand originCommand
+ * @param originCommand originCommand
* @param processInstance processInstance
*/
public void createRecoveryWaitingThreadCommand(Command originCommand,
ProcessInstance processInstance) {
@@ -473,17 +471,17 @@ public class ProcessService {
// process instance quit by "waiting thread" state
if (originCommand == null) {
Command command = new Command(
- CommandType.RECOVER_WAITTING_THREAD,
- processInstance.getTaskDependType(),
- processInstance.getFailureStrategy(),
- processInstance.getExecutorId(),
- processInstance.getProcessDefinitionId(),
- JSONUtils.toJsonString(cmdParam),
- processInstance.getWarningType(),
- processInstance.getWarningGroupId(),
- processInstance.getScheduleTime(),
- processInstance.getWorkerGroup(),
- processInstance.getProcessInstancePriority()
+ CommandType.RECOVER_WAITTING_THREAD,
+ processInstance.getTaskDependType(),
+ processInstance.getFailureStrategy(),
+ processInstance.getExecutorId(),
+ processInstance.getProcessDefinitionId(),
+ JSONUtils.toJsonString(cmdParam),
+ processInstance.getWarningType(),
+ processInstance.getWarningGroupId(),
+ processInstance.getScheduleTime(),
+ processInstance.getWorkerGroup(),
+ processInstance.getProcessInstancePriority()
);
saveCommand(command);
return;
@@ -508,16 +506,14 @@ public class ProcessService {
/**
* get schedule time from command
*
- * @param command command
+ * @param command command
* @param cmdParam cmdParam map
* @return date
*/
private Date getScheduleTime(Command command, Map<String, String>
cmdParam) {
Date scheduleTime = command.getScheduleTime();
- if (scheduleTime == null) {
- if (cmdParam != null &&
cmdParam.containsKey(CMDPARAM_COMPLEMENT_DATA_START_DATE)) {
- scheduleTime =
DateUtils.stringToDate(cmdParam.get(CMDPARAM_COMPLEMENT_DATA_START_DATE));
- }
+ if (scheduleTime == null && cmdParam != null &&
cmdParam.containsKey(CMDPARAM_COMPLEMENT_DATA_START_DATE)) {
+ scheduleTime =
DateUtils.stringToDate(cmdParam.get(CMDPARAM_COMPLEMENT_DATA_START_DATE));
}
return scheduleTime;
}
@@ -526,8 +522,8 @@ public class ProcessService {
* generate a new work process instance from command.
*
* @param processDefinition processDefinition
- * @param command command
- * @param cmdParam cmdParam map
+ * @param command command
+ * @param cmdParam cmdParam map
* @return process instance
*/
private ProcessInstance generateNewProcessInstance(ProcessDefinition
processDefinition,
@@ -580,10 +576,10 @@ public class ProcessService {
// curing global params
processInstance.setGlobalParams(ParameterUtils.curingGlobalParams(
- processDefinition.getGlobalParamMap(),
- processDefinition.getGlobalParamList(),
- getCommandTypeIfComplement(processInstance, command),
- processInstance.getScheduleTime()));
+ processDefinition.getGlobalParamMap(),
+ processDefinition.getGlobalParamList(),
+ getCommandTypeIfComplement(processInstance, command),
+ processInstance.getScheduleTime()));
//copy process define json to process instance
processInstance.setProcessInstanceJson(processDefinition.getProcessDefinitionJson());
@@ -603,7 +599,7 @@ public class ProcessService {
* use definition creator's tenant.
*
* @param tenantId tenantId
- * @param userId userId
+ * @param userId userId
* @return tenant
*/
public Tenant getTenantForProcess(int tenantId, int userId) {
@@ -626,15 +622,15 @@ public class ProcessService {
/**
* check command parameters is valid
*
- * @param command command
+ * @param command command
* @param cmdParam cmdParam map
* @return whether command param is valid
*/
private Boolean checkCmdParam(Command command, Map<String, String>
cmdParam) {
if (command.getTaskDependType() == TaskDependType.TASK_ONLY ||
command.getTaskDependType() == TaskDependType.TASK_PRE) {
if (cmdParam == null
- || !cmdParam.containsKey(Constants.CMD_PARAM_START_NODE_NAMES)
- ||
cmdParam.get(Constants.CMD_PARAM_START_NODE_NAMES).isEmpty()) {
+ ||
!cmdParam.containsKey(Constants.CMD_PARAM_START_NODE_NAMES)
+ ||
cmdParam.get(Constants.CMD_PARAM_START_NODE_NAMES).isEmpty()) {
logger.error("command node depend type is {}, but start nodes
is null ", command.getTaskDependType());
return false;
}
@@ -646,7 +642,7 @@ public class ProcessService {
* construct process instance according to one command.
*
* @param command command
- * @param host host
+ * @param host host
* @return process instance
*/
private ProcessInstance constructProcessInstance(Command command, String
host) {
@@ -714,7 +710,7 @@ public class ProcessService {
// generate one new process instance
processInstance = generateNewProcessInstance(processDefinition,
command, cmdParam);
}
- if (!checkCmdParam(command, cmdParam)) {
+ if (Boolean.FALSE.equals(checkCmdParam(command, cmdParam))) {
logger.error("command parameter check failed!");
return null;
}
@@ -742,7 +738,7 @@ public class ProcessService {
initTaskInstance(this.findTaskInstanceById(taskId));
}
cmdParam.put(Constants.CMD_PARAM_RECOVERY_START_NODE_STRING,
- String.join(Constants.COMMA,
convertIntListToString(failedList)));
+ String.join(Constants.COMMA,
convertIntListToString(failedList)));
processInstance.setCommandParam(JSONUtils.toJsonString(cmdParam));
processInstance.setRunTimes(runTime + 1);
break;
@@ -755,7 +751,7 @@ public class ProcessService {
cmdParam.remove(Constants.CMD_PARAM_RECOVERY_START_NODE_STRING);
List<Integer> suspendedNodeList =
this.findTaskIdByInstanceState(processInstance.getId(), ExecutionStatus.PAUSE);
List<Integer> stopNodeList =
findTaskIdByInstanceState(processInstance.getId(),
- ExecutionStatus.KILL);
+ ExecutionStatus.KILL);
suspendedNodeList.addAll(stopNodeList);
for (Integer taskId : suspendedNodeList) {
// initialize the pause state
@@ -809,7 +805,7 @@ public class ProcessService {
* return complement data if the process start with complement data
*
* @param processInstance processInstance
- * @param command command
+ * @param command command
* @return command type
*/
private CommandType getCommandTypeIfComplement(ProcessInstance
processInstance, Command command) {
@@ -824,8 +820,8 @@ public class ProcessService {
* initialize complement data parameters
*
* @param processDefinition processDefinition
- * @param processInstance processInstance
- * @param cmdParam cmdParam
+ * @param processInstance processInstance
+ * @param cmdParam cmdParam
*/
private void initComplementDataParam(ProcessDefinition processDefinition,
ProcessInstance processInstance,
@@ -835,14 +831,14 @@ public class ProcessService {
}
Date startComplementTime =
DateUtils.parse(cmdParam.get(CMDPARAM_COMPLEMENT_DATA_START_DATE),
- YYYY_MM_DD_HH_MM_SS);
+ YYYY_MM_DD_HH_MM_SS);
if (Flag.NO == processInstance.getIsSubProcess()) {
processInstance.setScheduleTime(startComplementTime);
}
processInstance.setGlobalParams(ParameterUtils.curingGlobalParams(
- processDefinition.getGlobalParamMap(),
- processDefinition.getGlobalParamList(),
- CommandType.COMPLEMENT_DATA, processInstance.getScheduleTime()));
+ processDefinition.getGlobalParamMap(),
+ processDefinition.getGlobalParamList(),
+ CommandType.COMPLEMENT_DATA,
processInstance.getScheduleTime()));
}
@@ -862,7 +858,7 @@ public class ProcessService {
Map<String, String> paramMap = JSONUtils.toMap(cmdParam);
// write sub process id into cmd param.
if (paramMap.containsKey(CMD_PARAM_SUB_PROCESS)
- &&
CMD_PARAM_EMPTY_SUB_PROCESS.equals(paramMap.get(CMD_PARAM_SUB_PROCESS))) {
+ &&
CMD_PARAM_EMPTY_SUB_PROCESS.equals(paramMap.get(CMD_PARAM_SUB_PROCESS))) {
paramMap.remove(CMD_PARAM_SUB_PROCESS);
paramMap.put(CMD_PARAM_SUB_PROCESS,
String.valueOf(subProcessInstance.getId()));
subProcessInstance.setCommandParam(JSONUtils.toJsonString(paramMap));
@@ -875,7 +871,7 @@ public class ProcessService {
ProcessInstance parentInstance =
findProcessInstanceDetailById(Integer.parseInt(parentInstanceId));
if (parentInstance != null) {
subProcessInstance.setGlobalParams(
- joinGlobalParams(parentInstance.getGlobalParams(),
subProcessInstance.getGlobalParams()));
+ joinGlobalParams(parentInstance.getGlobalParams(),
subProcessInstance.getGlobalParams()));
this.saveProcessInstance(subProcessInstance);
} else {
logger.error("sub process command params error, cannot find
parent instance: {} ", cmdParam);
@@ -897,7 +893,7 @@ public class ProcessService {
* only the keys doesn't in sub process global would be joined.
*
* @param parentGlobalParams parentGlobalParams
- * @param subGlobalParams subGlobalParams
+ * @param subGlobalParams subGlobalParams
* @return global params join
*/
private String joinGlobalParams(String parentGlobalParams, String
subGlobalParams) {
@@ -922,12 +918,11 @@ public class ProcessService {
*/
private void initTaskInstance(TaskInstance taskInstance) {
- if (!taskInstance.isSubProcess()) {
- if (taskInstance.getState().typeIsCancel() ||
taskInstance.getState().typeIsFailure()) {
- taskInstance.setFlag(Flag.NO);
- updateTaskInstance(taskInstance);
- return;
- }
+ if (!taskInstance.isSubProcess()
+ && (taskInstance.getState().typeIsCancel() ||
taskInstance.getState().typeIsFailure())) {
+ taskInstance.setFlag(Flag.NO);
+ updateTaskInstance(taskInstance);
+ return;
}
taskInstance.setState(ExecutionStatus.SUBMITTED_SUCCESS);
updateTaskInstance(taskInstance);
@@ -944,12 +939,12 @@ public class ProcessService {
public TaskInstance submitTask(TaskInstance taskInstance) {
ProcessInstance processInstance =
this.findProcessInstanceDetailById(taskInstance.getProcessInstanceId());
logger.info("start submit task : {}, instance id:{}, state: {}",
- taskInstance.getName(), taskInstance.getProcessInstanceId(),
processInstance.getState());
+ taskInstance.getName(), taskInstance.getProcessInstanceId(),
processInstance.getState());
//submit to db
TaskInstance task = submitTaskInstanceToDB(taskInstance,
processInstance);
if (task == null) {
logger.error("end submit task to db error, task name:{}, process
id:{} state: {} ",
- taskInstance.getName(), taskInstance.getProcessInstance(),
processInstance.getState());
+ taskInstance.getName(), taskInstance.getProcessInstance(),
processInstance.getState());
return task;
}
if (!task.getState().typeIsFinished()) {
@@ -957,7 +952,7 @@ public class ProcessService {
}
logger.info("end submit task to db successfully:{} state:{} complete,
instance id:{} state: {} ",
- taskInstance.getName(), task.getState(), processInstance.getId(),
processInstance.getState());
+ taskInstance.getName(), task.getState(),
processInstance.getId(), processInstance.getState());
return task;
}
@@ -968,7 +963,7 @@ public class ProcessService {
* set map {parent instance id, task instance id, 0(child instance id)}
*
* @param parentInstance parentInstance
- * @param parentTask parentTask
+ * @param parentTask parentTask
* @return process instance map
*/
private ProcessInstanceMap setProcessInstanceMap(ProcessInstance
parentInstance, TaskInstance parentTask) {
@@ -997,7 +992,7 @@ public class ProcessService {
* find previous task work process map.
*
* @param parentProcessInstance parentProcessInstance
- * @param parentTask parentTask
+ * @param parentTask parentTask
* @return process instance map
*/
private ProcessInstanceMap findPreviousTaskProcessMap(ProcessInstance
parentProcessInstance,
@@ -1015,7 +1010,7 @@ public class ProcessService {
}
}
logger.info("sub process instance is not found,parent task:{},parent
instance:{}",
- parentTask.getId(), parentProcessInstance.getId());
+ parentTask.getId(), parentProcessInstance.getId());
return null;
}
@@ -1049,10 +1044,6 @@ public class ProcessService {
/**
* complement data needs transform parent parameter to child.
- *
- * @param instanceMap
- * @param parentProcessInstance
- * @return
*/
private String getSubWorkFlowParam(ProcessInstanceMap instanceMap,
ProcessInstance parentProcessInstance) {
// set sub work process command
@@ -1071,11 +1062,6 @@ public class ProcessService {
/**
* create sub work process command
- *
- * @param parentProcessInstance
- * @param childInstance
- * @param instanceMap
- * @param task
*/
public Command createSubProcessCommand(ProcessInstance
parentProcessInstance,
ProcessInstance childInstance,
@@ -1088,25 +1074,23 @@ public class ProcessService {
String processParam = getSubWorkFlowParam(instanceMap,
parentProcessInstance);
return new Command(
- commandType,
- TaskDependType.TASK_POST,
- parentProcessInstance.getFailureStrategy(),
- parentProcessInstance.getExecutorId(),
- childDefineId,
- processParam,
- parentProcessInstance.getWarningType(),
- parentProcessInstance.getWarningGroupId(),
- parentProcessInstance.getScheduleTime(),
- task.getWorkerGroup(),
- parentProcessInstance.getProcessInstancePriority()
+ commandType,
+ TaskDependType.TASK_POST,
+ parentProcessInstance.getFailureStrategy(),
+ parentProcessInstance.getExecutorId(),
+ childDefineId,
+ processParam,
+ parentProcessInstance.getWarningType(),
+ parentProcessInstance.getWarningGroupId(),
+ parentProcessInstance.getScheduleTime(),
+ task.getWorkerGroup(),
+ parentProcessInstance.getProcessInstancePriority()
);
}
/**
* initialize sub work flow state
* child instance state would be initialized when 'recovery from
pause/stop/failure'
- *
- * @param childInstance
*/
private void initSubInstanceState(ProcessInstance childInstance) {
if (childInstance != null) {
@@ -1119,9 +1103,6 @@ public class ProcessService {
* get sub work flow command type
* child instance exist: child command = fatherCommand
* child instance not exists: child command = fatherCommand[0]
- *
- * @param parentProcessInstance
- * @return
*/
private CommandType getSubCommandType(ProcessInstance
parentProcessInstance, ProcessInstance childInstance) {
CommandType commandType = parentProcessInstance.getCommandType();
@@ -1136,7 +1117,7 @@ public class ProcessService {
* update sub process definition
*
* @param parentProcessInstance parentProcessInstance
- * @param childDefinitionId childDefinitionId
+ * @param childDefinitionId childDefinitionId
*/
private void updateSubProcessDefinitionByParent(ProcessInstance
parentProcessInstance, int childDefinitionId) {
ProcessDefinition fatherDefinition =
this.findProcessDefineById(parentProcessInstance.getProcessDefinitionId());
@@ -1150,7 +1131,7 @@ public class ProcessService {
/**
* submit task to mysql
*
- * @param taskInstance taskInstance
+ * @param taskInstance taskInstance
* @param processInstance processInstance
* @return task instance
*/
@@ -1163,7 +1144,7 @@ public class ProcessService {
} else {
if (processInstanceState != ExecutionStatus.READY_STOP
- && processInstanceState != ExecutionStatus.READY_PAUSE) {
+ && processInstanceState !=
ExecutionStatus.READY_PAUSE) {
// failure task set invalid
taskInstance.setFlag(Flag.NO);
updateTaskInstance(taskInstance);
@@ -1204,19 +1185,19 @@ public class ProcessService {
* return stop if work process state is ready stop
* if all of above are not satisfied, return submit success
*
- * @param taskInstance taskInstance
+ * @param taskInstance taskInstance
* @param processInstanceState processInstanceState
* @return process instance state
*/
public ExecutionStatus getSubmitTaskState(TaskInstance taskInstance,
ExecutionStatus processInstanceState) {
ExecutionStatus state = taskInstance.getState();
if (
- // running, delayed or killed
- // the task already exists in task queue
- // return state
- state == ExecutionStatus.RUNNING_EXECUTION
- || state == ExecutionStatus.DELAY_EXECUTION
- || state == ExecutionStatus.KILL
+ // running, delayed or killed
+ // the task already exists in task queue
+ // return state
+ state == ExecutionStatus.RUNNING_EXECUTION
+ || state == ExecutionStatus.DELAY_EXECUTION
+ || state == ExecutionStatus.KILL
) {
return state;
}
@@ -1225,7 +1206,7 @@ public class ProcessService {
if (processInstanceState == ExecutionStatus.READY_PAUSE) {
state = ExecutionStatus.PAUSE;
} else if (processInstanceState == ExecutionStatus.READY_STOP
- || !checkProcessStrategy(taskInstance)) {
+ || !checkProcessStrategy(taskInstance)) {
state = ExecutionStatus.KILL;
} else {
state = ExecutionStatus.SUBMITTED_SUCCESS;
@@ -1380,7 +1361,7 @@ public class ProcessService {
* get id list by task state
*
* @param instanceId instanceId
- * @param state state
+ * @param state state
* @return task instance states
*/
public List<Integer> findTaskIdByInstanceState(int instanceId,
ExecutionStatus state) {
@@ -1435,7 +1416,7 @@ public class ProcessService {
* find work process map by parent process id and parent task id.
*
* @param parentWorkProcessId parentWorkProcessId
- * @param parentTaskId parentTaskId
+ * @param parentTaskId parentTaskId
* @return process instance map
*/
public ProcessInstanceMap findWorkProcessMapByParent(Integer
parentWorkProcessId, Integer parentTaskId) {
@@ -1457,7 +1438,7 @@ public class ProcessService {
* find sub process instance
*
* @param parentProcessId parentProcessId
- * @param parentTaskId parentTaskId
+ * @param parentTaskId parentTaskId
* @return process instance
*/
public ProcessInstance findSubProcessInstance(Integer parentProcessId,
Integer parentTaskId) {
@@ -1489,12 +1470,12 @@ public class ProcessService {
/**
* change task state
*
- * @param state state
- * @param startTime startTime
- * @param host host
+ * @param state state
+ * @param startTime startTime
+ * @param host host
* @param executePath executePath
- * @param logPath logPath
- * @param taskInstId taskInstId
+ * @param logPath logPath
+ * @param taskInstId taskInstId
*/
public void changeTaskState(TaskInstance taskInstance, ExecutionStatus
state, Date startTime, String host,
String executePath,
@@ -1522,12 +1503,12 @@ public class ProcessService {
* update the process instance
*
* @param processInstanceId processInstanceId
- * @param processJson processJson
- * @param globalParams globalParams
- * @param scheduleTime scheduleTime
- * @param flag flag
- * @param locations locations
- * @param connects connects
+ * @param processJson processJson
+ * @param globalParams globalParams
+ * @param scheduleTime scheduleTime
+ * @param flag flag
+ * @param locations locations
+ * @param connects connects
* @return update process instance result
*/
public int updateProcessInstance(Integer processInstanceId, String
processJson,
@@ -1548,10 +1529,10 @@ public class ProcessService {
/**
* change task state
*
- * @param state state
- * @param endTime endTime
+ * @param state state
+ * @param endTime endTime
* @param taskInstId taskInstId
- * @param varPool varPool
+ * @param varPool varPool
*/
public void changeTaskState(TaskInstance taskInstance, ExecutionStatus
state,
Date endTime,
@@ -1577,7 +1558,7 @@ public class ProcessService {
if (intList == null) {
return new ArrayList<>();
}
- List<String> result = new ArrayList<String>(intList.size());
+ List<String> result = new ArrayList<>(intList.size());
for (Integer intVar : intList) {
result.add(String.valueOf(intVar));
}
@@ -1642,7 +1623,7 @@ public class ProcessService {
*/
public List<TaskInstance> queryNeedFailoverTaskInstances(String host) {
return taskInstanceMapper.queryByHostAndStatus(host,
- stateArray);
+ stateArray);
}
/**
@@ -1659,7 +1640,7 @@ public class ProcessService {
* update process instance state by id
*
* @param processInstanceId processInstanceId
- * @param executionStatus executionStatus
+ * @param executionStatus executionStatus
* @return update process result
*/
public int updateProcessInstanceState(Integer processInstanceId,
ExecutionStatus executionStatus) {
@@ -1696,7 +1677,7 @@ public class ProcessService {
/**
* find tenant code by resource name
*
- * @param resName resource name
+ * @param resName resource name
* @param resourceType resource type
* @return tenant code
*/
@@ -1714,35 +1695,35 @@ public class ProcessService {
*/
public List<Schedule> selectAllByProcessDefineId(int[] ids) {
return scheduleMapper.selectAllByProcessDefineArray(
- ids);
+ ids);
}
/**
* get dependency cycle by work process define id and scheduler fire time
*
- * @param masterId masterId
+ * @param masterId masterId
* @param processDefinitionId processDefinitionId
- * @param scheduledFireTime the time the task schedule is expected to
trigger
+ * @param scheduledFireTime the time the task schedule is expected to
trigger
* @return CycleDependency
* @throws Exception if error throws Exception
*/
public CycleDependency getCycleDependency(int masterId, int
processDefinitionId, Date scheduledFireTime) throws Exception {
- List<CycleDependency> list = getCycleDependencies(masterId, new int[]
{processDefinitionId}, scheduledFireTime);
- return list.size() > 0 ? list.get(0) : null;
+ List<CycleDependency> list = getCycleDependencies(masterId, new
int[]{processDefinitionId}, scheduledFireTime);
+ return !list.isEmpty() ? list.get(0) : null;
}
/**
* get dependency cycle list by work process define id list and scheduler
fire time
*
- * @param masterId masterId
- * @param ids ids
+ * @param masterId masterId
+ * @param ids ids
* @param scheduledFireTime the time the task schedule is expected to
trigger
* @return CycleDependency list
* @throws Exception if error throws Exception
*/
public List<CycleDependency> getCycleDependencies(int masterId, int[] ids,
Date scheduledFireTime) throws Exception {
- List<CycleDependency> cycleDependencyList = new
ArrayList<CycleDependency>();
+ List<CycleDependency> cycleDependencyList = new ArrayList<>();
if (null == ids || ids.length == 0) {
logger.warn("ids[] is empty!is invalid!");
return cycleDependencyList;
@@ -1769,14 +1750,10 @@ public class ProcessService {
}
Calendar calendar = Calendar.getInstance();
switch (cycleEnum) {
- /*case MINUTE:
- calendar.add(Calendar.MINUTE,-61);*/
case HOUR:
calendar.add(Calendar.HOUR, -25);
break;
case DAY:
- calendar.add(Calendar.DATE, -32);
- break;
case WEEK:
calendar.add(Calendar.DATE, -32);
break;
@@ -1784,7 +1761,8 @@ public class ProcessService {
calendar.add(Calendar.MONTH, -13);
break;
default:
- logger.warn("Dependent process definition's cycleEnum is
{},not support!!", cycleEnum.name());
+ String cycleName = cycleEnum.name();
+ logger.warn("Dependent process definition's cycleEnum is
{},not support!!", cycleName);
continue;
}
Date start = calendar.getTime();
@@ -1794,7 +1772,7 @@ public class ProcessService {
} else {
list = CronUtils.getFireDateList(start, scheduledFireTime,
depCronExpression);
}
- if (list.size() >= 1) {
+ if (!list.isEmpty()) {
start = list.get(list.size() - 1);
CycleDependency dependency = new
CycleDependency(depSchedule.getProcessDefinitionId(), start,
CronUtils.getExpirationTime(start, cycleEnum), cycleEnum);
cycleDependencyList.add(dependency);
@@ -1813,8 +1791,8 @@ public class ProcessService {
*/
public ProcessInstance findLastSchedulerProcessInterval(int definitionId,
DateInterval dateInterval) {
return processInstanceMapper.queryLastSchedulerProcess(definitionId,
- dateInterval.getStartTime(),
- dateInterval.getEndTime());
+ dateInterval.getStartTime(),
+ dateInterval.getEndTime());
}
/**
@@ -1826,23 +1804,23 @@ public class ProcessService {
*/
public ProcessInstance findLastManualProcessInterval(int definitionId,
DateInterval dateInterval) {
return processInstanceMapper.queryLastManualProcess(definitionId,
- dateInterval.getStartTime(),
- dateInterval.getEndTime());
+ dateInterval.getStartTime(),
+ dateInterval.getEndTime());
}
/**
* find last running process instance
*
* @param definitionId process definition id
- * @param startTime start time
- * @param endTime end time
+ * @param startTime start time
+ * @param endTime end time
* @return process instance
*/
public ProcessInstance findLastRunningProcess(int definitionId, Date
startTime, Date endTime) {
return processInstanceMapper.queryLastRunningProcess(definitionId,
- startTime,
- endTime,
- stateArray);
+ startTime,
+ endTime,
+ stateArray);
}
/**
@@ -1867,6 +1845,7 @@ public class ProcessService {
/**
* query project name and user name by processInstanceId.
+ *
* @param processInstanceId processInstanceId
* @return projectName and userName
*/
@@ -1934,35 +1913,32 @@ public class ProcessService {
/**
* list unauthorized udf function
*
- * @param userId user id
+ * @param userId user id
* @param needChecks data source id array
* @return unauthorized udf function list
*/
public <T> List<T> listUnauthorized(int userId, T[] needChecks,
AuthorizationType authorizationType) {
- List<T> resultList = new ArrayList<T>();
+ List<T> resultList = new ArrayList<>();
if (Objects.nonNull(needChecks) && needChecks.length > 0) {
- Set<T> originResSet = new HashSet<T>(Arrays.asList(needChecks));
+ Set<T> originResSet = new HashSet<>(Arrays.asList(needChecks));
switch (authorizationType) {
case RESOURCE_FILE_ID:
- Set<Integer> authorizedResourceFiles =
resourceMapper.listAuthorizedResourceById(userId, needChecks).stream().map(t ->
t.getId()).collect(toSet());
+ case UDF_FILE:
+ Set<Integer> authorizedResourceFiles =
resourceMapper.listAuthorizedResourceById(userId,
needChecks).stream().map(Resource::getId).collect(toSet());
originResSet.removeAll(authorizedResourceFiles);
break;
case RESOURCE_FILE_NAME:
- Set<String> authorizedResources =
resourceMapper.listAuthorizedResource(userId, needChecks).stream().map(t ->
t.getFullName()).collect(toSet());
+ Set<String> authorizedResources =
resourceMapper.listAuthorizedResource(userId,
needChecks).stream().map(Resource::getFullName).collect(toSet());
originResSet.removeAll(authorizedResources);
break;
- case UDF_FILE:
- Set<Integer> authorizedUdfFiles =
resourceMapper.listAuthorizedResourceById(userId, needChecks).stream().map(t ->
t.getId()).collect(toSet());
- originResSet.removeAll(authorizedUdfFiles);
- break;
case DATASOURCE:
- Set<Integer> authorizedDatasources =
dataSourceMapper.listAuthorizedDataSource(userId, needChecks).stream().map(t ->
t.getId()).collect(toSet());
+ Set<Integer> authorizedDatasources =
dataSourceMapper.listAuthorizedDataSource(userId,
needChecks).stream().map(DataSource::getId).collect(toSet());
originResSet.removeAll(authorizedDatasources);
break;
case UDF:
- Set<Integer> authorizedUdfs =
udfFuncMapper.listAuthorizedUdfFunc(userId, needChecks).stream().map(t ->
t.getId()).collect(toSet());
+ Set<Integer> authorizedUdfs =
udfFuncMapper.listAuthorizedUdfFunc(userId,
needChecks).stream().map(UdfFunc::getId).collect(toSet());
originResSet.removeAll(authorizedUdfs);
break;
default:
@@ -2007,9 +1983,6 @@ public class ProcessService {
/**
* format task app id in task instance
- *
- * @param taskInstance
- * @return
*/
public String formatTaskAppId(TaskInstance taskInstance) {
ProcessDefinition definition =
this.findProcessDefineById(taskInstance.getProcessDefinitionId());
@@ -2019,9 +1992,9 @@ public class ProcessService {
return "";
}
return String.format("%s_%s_%s",
- definition.getId(),
- processInstanceById.getId(),
- taskInstance.getId());
+ definition.getId(),
+ processInstanceById.getId(),
+ taskInstance.getId());
}
}
diff --git
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/quartz/ProcessScheduleJob.java
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/quartz/ProcessScheduleJob.java
index 6ac847b..2921ce2 100644
---
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/quartz/ProcessScheduleJob.java
+++
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/quartz/ProcessScheduleJob.java
@@ -14,8 +14,8 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.dolphinscheduler.service.quartz;
+package org.apache.dolphinscheduler.service.quartz;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.CommandType;
@@ -25,6 +25,9 @@ import
org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.Schedule;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.service.process.ProcessService;
+
+import java.util.Date;
+
import org.quartz.Job;
import org.quartz.JobDataMap;
import org.quartz.JobExecutionContext;
@@ -34,8 +37,6 @@ import org.slf4j.LoggerFactory;
import org.springframework.util.Assert;
import org.springframework.util.StringUtils;
-import java.util.Date;
-
/**
* process schedule job
*/
@@ -46,7 +47,7 @@ public class ProcessScheduleJob implements Job {
*/
private static final Logger logger =
LoggerFactory.getLogger(ProcessScheduleJob.class);
- public ProcessService getProcessService(){
+ public ProcessService getProcessService() {
return SpringApplicationContext.getBean(ProcessService.class);
}
@@ -66,10 +67,8 @@ public class ProcessScheduleJob implements Job {
int projectId = dataMap.getInt(Constants.PROJECT_ID);
int scheduleId = dataMap.getInt(Constants.SCHEDULE_ID);
-
Date scheduledFireTime = context.getScheduledFireTime();
-
Date fireTime = context.getFireTime();
logger.info("scheduled fire time :{}, fire time :{}, process id :{}",
scheduledFireTime, fireTime, scheduleId);
@@ -82,11 +81,10 @@ public class ProcessScheduleJob implements Job {
return;
}
-
ProcessDefinition processDefinition =
getProcessService().findProcessDefineById(schedule.getProcessDefinitionId());
// release state : online/offline
ReleaseState releaseState = processDefinition.getReleaseState();
- if (processDefinition == null || releaseState == ReleaseState.OFFLINE)
{
+ if (releaseState == ReleaseState.OFFLINE) {
logger.warn("process definition does not exist in db or
offline,need not to create command, projectId:{}, processId:{}", projectId,
scheduleId);
return;
}
@@ -107,7 +105,6 @@ public class ProcessScheduleJob implements Job {
getProcessService().createCommand(command);
}
-
/**
* delete job
*/
diff --git
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/quartz/QuartzExecutors.java
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/quartz/QuartzExecutors.java
index 3b15810..fd91e40 100644
---
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/quartz/QuartzExecutors.java
+++
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/quartz/QuartzExecutors.java
@@ -14,15 +14,76 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.dolphinscheduler.service.quartz;
+import static
org.apache.dolphinscheduler.common.Constants.ORG_POSTGRESQL_DRIVER;
+import static
org.apache.dolphinscheduler.common.Constants.ORG_QUARTZ_DATASOURCE_MYDS_CONNECTIONPROVIDER_CLASS;
+import static
org.apache.dolphinscheduler.common.Constants.ORG_QUARTZ_JOBSTORE_ACQUIRETRIGGERSWITHINLOCK;
+import static
org.apache.dolphinscheduler.common.Constants.ORG_QUARTZ_JOBSTORE_CLASS;
+import static
org.apache.dolphinscheduler.common.Constants.ORG_QUARTZ_JOBSTORE_CLUSTERCHECKININTERVAL;
+import static
org.apache.dolphinscheduler.common.Constants.ORG_QUARTZ_JOBSTORE_DATASOURCE;
+import static
org.apache.dolphinscheduler.common.Constants.ORG_QUARTZ_JOBSTORE_DRIVERDELEGATECLASS;
+import static
org.apache.dolphinscheduler.common.Constants.ORG_QUARTZ_JOBSTORE_ISCLUSTERED;
+import static
org.apache.dolphinscheduler.common.Constants.ORG_QUARTZ_JOBSTORE_MISFIRETHRESHOLD;
+import static
org.apache.dolphinscheduler.common.Constants.ORG_QUARTZ_JOBSTORE_TABLEPREFIX;
+import static
org.apache.dolphinscheduler.common.Constants.ORG_QUARTZ_JOBSTORE_USEPROPERTIES;
+import static
org.apache.dolphinscheduler.common.Constants.ORG_QUARTZ_SCHEDULER_INSTANCEID;
+import static
org.apache.dolphinscheduler.common.Constants.ORG_QUARTZ_SCHEDULER_INSTANCENAME;
+import static
org.apache.dolphinscheduler.common.Constants.ORG_QUARTZ_SCHEDULER_MAKESCHEDULERTHREADDAEMON;
+import static
org.apache.dolphinscheduler.common.Constants.ORG_QUARTZ_THREADPOOL_CLASS;
+import static
org.apache.dolphinscheduler.common.Constants.ORG_QUARTZ_THREADPOOL_MAKETHREADSDAEMONS;
+import static
org.apache.dolphinscheduler.common.Constants.ORG_QUARTZ_THREADPOOL_THREADCOUNT;
+import static
org.apache.dolphinscheduler.common.Constants.ORG_QUARTZ_THREADPOOL_THREADPRIORITY;
+import static org.apache.dolphinscheduler.common.Constants.PROJECT_ID;
+import static
org.apache.dolphinscheduler.common.Constants.QUARTZ_ACQUIRETRIGGERSWITHINLOCK;
+import static
org.apache.dolphinscheduler.common.Constants.QUARTZ_CLUSTERCHECKININTERVAL;
+import static org.apache.dolphinscheduler.common.Constants.QUARTZ_DATASOURCE;
+import static org.apache.dolphinscheduler.common.Constants.QUARTZ_INSTANCEID;
+import static org.apache.dolphinscheduler.common.Constants.QUARTZ_INSTANCENAME;
+import static
org.apache.dolphinscheduler.common.Constants.QUARTZ_JOB_GROUP_PRIFIX;
+import static org.apache.dolphinscheduler.common.Constants.QUARTZ_JOB_PRIFIX;
+import static
org.apache.dolphinscheduler.common.Constants.QUARTZ_MISFIRETHRESHOLD;
+import static
org.apache.dolphinscheduler.common.Constants.QUARTZ_PROPERTIES_PATH;
+import static org.apache.dolphinscheduler.common.Constants.QUARTZ_TABLE_PREFIX;
+import static org.apache.dolphinscheduler.common.Constants.QUARTZ_THREADCOUNT;
+import static
org.apache.dolphinscheduler.common.Constants.QUARTZ_THREADPRIORITY;
+import static org.apache.dolphinscheduler.common.Constants.SCHEDULE;
+import static org.apache.dolphinscheduler.common.Constants.SCHEDULE_ID;
+import static
org.apache.dolphinscheduler.common.Constants.SPRING_DATASOURCE_DRIVER_CLASS_NAME;
+import static org.apache.dolphinscheduler.common.Constants.STRING_FALSE;
+import static org.apache.dolphinscheduler.common.Constants.STRING_TRUE;
+import static org.apache.dolphinscheduler.common.Constants.UNDERLINE;
+
+import static org.quartz.CronScheduleBuilder.cronSchedule;
+import static org.quartz.JobBuilder.newJob;
+import static org.quartz.TriggerBuilder.newTrigger;
+
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
+import org.apache.dolphinscheduler.common.utils.StringUtils;
+import org.apache.dolphinscheduler.dao.entity.Schedule;
+import org.apache.dolphinscheduler.service.exceptions.ServiceException;
+
import org.apache.commons.configuration.Configuration;
import org.apache.commons.configuration.ConfigurationException;
import org.apache.commons.configuration.PropertiesConfiguration;
-import org.apache.commons.lang.StringUtils;
-import org.apache.dolphinscheduler.common.utils.*;
-import org.apache.dolphinscheduler.dao.entity.Schedule;
-import org.quartz.*;
+
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import org.quartz.CronTrigger;
+import org.quartz.Job;
+import org.quartz.JobDetail;
+import org.quartz.JobKey;
+import org.quartz.Scheduler;
+import org.quartz.SchedulerException;
+import org.quartz.TriggerKey;
import org.quartz.impl.StdSchedulerFactory;
import org.quartz.impl.jdbcjobstore.JobStoreTX;
import org.quartz.impl.jdbcjobstore.PostgreSQLDelegate;
@@ -32,300 +93,289 @@ import org.quartz.simpl.SimpleThreadPool;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.*;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
-import static org.apache.dolphinscheduler.common.Constants.*;
-import static org.quartz.CronScheduleBuilder.cronSchedule;
-import static org.quartz.JobBuilder.newJob;
-import static org.quartz.TriggerBuilder.newTrigger;
-
/**
* single Quartz executors instance
*/
public class QuartzExecutors {
- /**
- * logger of QuartzExecutors
- */
- private static final Logger logger =
LoggerFactory.getLogger(QuartzExecutors.class);
-
- /**
- * read write lock
- */
- private final ReadWriteLock lock = new ReentrantReadWriteLock();
-
- /**
- * A Scheduler maintains a registry of org.quartz.JobDetail and Trigger.
- */
- private static Scheduler scheduler;
-
- /**
- * load conf
- */
- private static Configuration conf;
-
- private static final class Holder {
- private static final QuartzExecutors instance = new QuartzExecutors();
- }
-
-
- private QuartzExecutors() {
- try {
- conf = new PropertiesConfiguration(QUARTZ_PROPERTIES_PATH);
- init();
- }catch (ConfigurationException e){
- logger.warn("not loaded quartz configuration file, will used default
value",e);
+ /**
+ * logger of QuartzExecutors
+ */
+ private static final Logger logger =
LoggerFactory.getLogger(QuartzExecutors.class);
+
+ /**
+ * read write lock
+ */
+ private final ReadWriteLock lock = new ReentrantReadWriteLock();
+
+ /**
+ * A Scheduler maintains a registry of org.quartz.JobDetail and Trigger.
+ */
+ private static Scheduler scheduler;
+
+ /**
+ * load conf
+ */
+ private static Configuration conf;
+
+ private static final class Holder {
+ private static final QuartzExecutors instance = new QuartzExecutors();
}
- }
-
- /**
- * thread safe and performance promote
- * @return instance of Quartz Executors
- */
- public static QuartzExecutors getInstance() {
- return Holder.instance;
- }
-
-
- /**
- * init
- *
- * Returns a client-usable handle to a Scheduler.
- */
- private void init() {
- try {
- StdSchedulerFactory schedulerFactory = new StdSchedulerFactory();
- Properties properties = new Properties();
-
- String dataSourceDriverClass =
org.apache.dolphinscheduler.dao.utils.PropertyUtils.getString(SPRING_DATASOURCE_DRIVER_CLASS_NAME);
- if (dataSourceDriverClass.equals(ORG_POSTGRESQL_DRIVER)){
-
properties.setProperty(ORG_QUARTZ_JOBSTORE_DRIVERDELEGATECLASS,conf.getString(ORG_QUARTZ_JOBSTORE_DRIVERDELEGATECLASS,
PostgreSQLDelegate.class.getName()));
- } else {
-
properties.setProperty(ORG_QUARTZ_JOBSTORE_DRIVERDELEGATECLASS,conf.getString(ORG_QUARTZ_JOBSTORE_DRIVERDELEGATECLASS,
StdJDBCDelegate.class.getName()));
- }
- properties.setProperty(ORG_QUARTZ_SCHEDULER_INSTANCENAME,
conf.getString(ORG_QUARTZ_SCHEDULER_INSTANCENAME, QUARTZ_INSTANCENAME));
- properties.setProperty(ORG_QUARTZ_SCHEDULER_INSTANCEID,
conf.getString(ORG_QUARTZ_SCHEDULER_INSTANCEID, QUARTZ_INSTANCEID));
-
properties.setProperty(ORG_QUARTZ_SCHEDULER_MAKESCHEDULERTHREADDAEMON,conf.getString(ORG_QUARTZ_SCHEDULER_MAKESCHEDULERTHREADDAEMON,STRING_TRUE));
-
properties.setProperty(ORG_QUARTZ_JOBSTORE_USEPROPERTIES,conf.getString(ORG_QUARTZ_JOBSTORE_USEPROPERTIES,STRING_FALSE));
-
properties.setProperty(ORG_QUARTZ_THREADPOOL_CLASS,conf.getString(ORG_QUARTZ_THREADPOOL_CLASS,
SimpleThreadPool.class.getName()));
-
properties.setProperty(ORG_QUARTZ_THREADPOOL_MAKETHREADSDAEMONS,conf.getString(ORG_QUARTZ_THREADPOOL_MAKETHREADSDAEMONS,STRING_TRUE));
-
properties.setProperty(ORG_QUARTZ_THREADPOOL_THREADCOUNT,conf.getString(ORG_QUARTZ_THREADPOOL_THREADCOUNT,
QUARTZ_THREADCOUNT));
-
properties.setProperty(ORG_QUARTZ_THREADPOOL_THREADPRIORITY,conf.getString(ORG_QUARTZ_THREADPOOL_THREADPRIORITY,
QUARTZ_THREADPRIORITY));
-
properties.setProperty(ORG_QUARTZ_JOBSTORE_CLASS,conf.getString(ORG_QUARTZ_JOBSTORE_CLASS,
JobStoreTX.class.getName()));
-
properties.setProperty(ORG_QUARTZ_JOBSTORE_TABLEPREFIX,conf.getString(ORG_QUARTZ_JOBSTORE_TABLEPREFIX,
QUARTZ_TABLE_PREFIX));
-
properties.setProperty(ORG_QUARTZ_JOBSTORE_ISCLUSTERED,conf.getString(ORG_QUARTZ_JOBSTORE_ISCLUSTERED,STRING_TRUE));
-
properties.setProperty(ORG_QUARTZ_JOBSTORE_MISFIRETHRESHOLD,conf.getString(ORG_QUARTZ_JOBSTORE_MISFIRETHRESHOLD,
QUARTZ_MISFIRETHRESHOLD));
-
properties.setProperty(ORG_QUARTZ_JOBSTORE_CLUSTERCHECKININTERVAL,conf.getString(ORG_QUARTZ_JOBSTORE_CLUSTERCHECKININTERVAL,
QUARTZ_CLUSTERCHECKININTERVAL));
-
properties.setProperty(ORG_QUARTZ_JOBSTORE_ACQUIRETRIGGERSWITHINLOCK,conf.getString(ORG_QUARTZ_JOBSTORE_ACQUIRETRIGGERSWITHINLOCK,
QUARTZ_ACQUIRETRIGGERSWITHINLOCK));
-
properties.setProperty(ORG_QUARTZ_JOBSTORE_DATASOURCE,conf.getString(ORG_QUARTZ_JOBSTORE_DATASOURCE,
QUARTZ_DATASOURCE));
-
properties.setProperty(ORG_QUARTZ_DATASOURCE_MYDS_CONNECTIONPROVIDER_CLASS,conf.getString(ORG_QUARTZ_DATASOURCE_MYDS_CONNECTIONPROVIDER_CLASS,DruidConnectionProvider.class.getName()));
-
- schedulerFactory.initialize(properties);
- scheduler = schedulerFactory.getScheduler();
-
- } catch (SchedulerException e) {
- logger.error(e.getMessage(),e);
- System.exit(1);
+
+ private QuartzExecutors() {
+ try {
+ conf = new PropertiesConfiguration(QUARTZ_PROPERTIES_PATH);
+ init();
+ } catch (ConfigurationException e) {
+ logger.warn("not loaded quartz configuration file, will used
default value", e);
+ }
}
- }
-
- /**
- * Whether the scheduler has been started.
- *
- * @throws SchedulerException scheduler exception
- */
- public void start() throws SchedulerException {
- if (!scheduler.isStarted()){
- scheduler.start();
- logger.info("Quartz service started" );
+ /**
+ * thread safe and performance promote
+ *
+ * @return instance of Quartz Executors
+ */
+ public static QuartzExecutors getInstance() {
+ return Holder.instance;
}
- }
-
- /**
- * stop all scheduled tasks
- *
- * Halts the Scheduler's firing of Triggers,
- * and cleans up all resources associated with the Scheduler.
- *
- * The scheduler cannot be re-started.
- * @throws SchedulerException scheduler exception
- */
- public void shutdown() throws SchedulerException {
- if (!scheduler.isShutdown()) {
- // don't wait for the task to complete
- scheduler.shutdown();
- logger.info("Quartz service stopped, and halt all tasks");
+
+ /**
+ * init
+ * <p>
+ * Returns a client-usable handle to a Scheduler.
+ */
+ private void init() {
+ try {
+ StdSchedulerFactory schedulerFactory = new StdSchedulerFactory();
+ Properties properties = new Properties();
+
+ String dataSourceDriverClass =
org.apache.dolphinscheduler.dao.utils.PropertyUtils.getString(SPRING_DATASOURCE_DRIVER_CLASS_NAME);
+ if (dataSourceDriverClass.equals(ORG_POSTGRESQL_DRIVER)) {
+
properties.setProperty(ORG_QUARTZ_JOBSTORE_DRIVERDELEGATECLASS,
conf.getString(ORG_QUARTZ_JOBSTORE_DRIVERDELEGATECLASS,
PostgreSQLDelegate.class.getName()));
+ } else {
+
properties.setProperty(ORG_QUARTZ_JOBSTORE_DRIVERDELEGATECLASS,
conf.getString(ORG_QUARTZ_JOBSTORE_DRIVERDELEGATECLASS,
StdJDBCDelegate.class.getName()));
+ }
+ properties.setProperty(ORG_QUARTZ_SCHEDULER_INSTANCENAME,
conf.getString(ORG_QUARTZ_SCHEDULER_INSTANCENAME, QUARTZ_INSTANCENAME));
+ properties.setProperty(ORG_QUARTZ_SCHEDULER_INSTANCEID,
conf.getString(ORG_QUARTZ_SCHEDULER_INSTANCEID, QUARTZ_INSTANCEID));
+
properties.setProperty(ORG_QUARTZ_SCHEDULER_MAKESCHEDULERTHREADDAEMON,
conf.getString(ORG_QUARTZ_SCHEDULER_MAKESCHEDULERTHREADDAEMON, STRING_TRUE));
+ properties.setProperty(ORG_QUARTZ_JOBSTORE_USEPROPERTIES,
conf.getString(ORG_QUARTZ_JOBSTORE_USEPROPERTIES, STRING_FALSE));
+ properties.setProperty(ORG_QUARTZ_THREADPOOL_CLASS,
conf.getString(ORG_QUARTZ_THREADPOOL_CLASS, SimpleThreadPool.class.getName()));
+ properties.setProperty(ORG_QUARTZ_THREADPOOL_MAKETHREADSDAEMONS,
conf.getString(ORG_QUARTZ_THREADPOOL_MAKETHREADSDAEMONS, STRING_TRUE));
+ properties.setProperty(ORG_QUARTZ_THREADPOOL_THREADCOUNT,
conf.getString(ORG_QUARTZ_THREADPOOL_THREADCOUNT, QUARTZ_THREADCOUNT));
+ properties.setProperty(ORG_QUARTZ_THREADPOOL_THREADPRIORITY,
conf.getString(ORG_QUARTZ_THREADPOOL_THREADPRIORITY, QUARTZ_THREADPRIORITY));
+ properties.setProperty(ORG_QUARTZ_JOBSTORE_CLASS,
conf.getString(ORG_QUARTZ_JOBSTORE_CLASS, JobStoreTX.class.getName()));
+ properties.setProperty(ORG_QUARTZ_JOBSTORE_TABLEPREFIX,
conf.getString(ORG_QUARTZ_JOBSTORE_TABLEPREFIX, QUARTZ_TABLE_PREFIX));
+ properties.setProperty(ORG_QUARTZ_JOBSTORE_ISCLUSTERED,
conf.getString(ORG_QUARTZ_JOBSTORE_ISCLUSTERED, STRING_TRUE));
+ properties.setProperty(ORG_QUARTZ_JOBSTORE_MISFIRETHRESHOLD,
conf.getString(ORG_QUARTZ_JOBSTORE_MISFIRETHRESHOLD, QUARTZ_MISFIRETHRESHOLD));
+ properties.setProperty(ORG_QUARTZ_JOBSTORE_CLUSTERCHECKININTERVAL,
conf.getString(ORG_QUARTZ_JOBSTORE_CLUSTERCHECKININTERVAL,
QUARTZ_CLUSTERCHECKININTERVAL));
+
properties.setProperty(ORG_QUARTZ_JOBSTORE_ACQUIRETRIGGERSWITHINLOCK,
conf.getString(ORG_QUARTZ_JOBSTORE_ACQUIRETRIGGERSWITHINLOCK,
QUARTZ_ACQUIRETRIGGERSWITHINLOCK));
+ properties.setProperty(ORG_QUARTZ_JOBSTORE_DATASOURCE,
conf.getString(ORG_QUARTZ_JOBSTORE_DATASOURCE, QUARTZ_DATASOURCE));
+
properties.setProperty(ORG_QUARTZ_DATASOURCE_MYDS_CONNECTIONPROVIDER_CLASS,
conf.getString(ORG_QUARTZ_DATASOURCE_MYDS_CONNECTIONPROVIDER_CLASS,
DruidConnectionProvider.class.getName()));
+
+ schedulerFactory.initialize(properties);
+ scheduler = schedulerFactory.getScheduler();
+
+ } catch (SchedulerException e) {
+ logger.error(e.getMessage(), e);
+ System.exit(1);
+ }
+
}
- }
-
-
- /**
- * add task trigger , if this task already exists, return this task with
updated trigger
- *
- * @param clazz job class name
- * @param jobName job name
- * @param jobGroupName job group name
- * @param startDate job start date
- * @param endDate job end date
- * @param cronExpression cron expression
- * @param jobDataMap job parameters data map
- */
- public void addJob(Class<? extends Job> clazz,String jobName,String
jobGroupName,Date startDate, Date endDate,
- String cronExpression,
- Map<String, Object> jobDataMap) {
- lock.writeLock().lock();
- try {
-
- JobKey jobKey = new JobKey(jobName, jobGroupName);
- JobDetail jobDetail;
- //add a task (if this task already exists, return this task directly)
- if (scheduler.checkExists(jobKey)) {
-
- jobDetail = scheduler.getJobDetail(jobKey);
- if (jobDataMap != null) {
- jobDetail.getJobDataMap().putAll(jobDataMap);
+
+ /**
+ * Whether the scheduler has been started.
+ *
+ * @throws SchedulerException scheduler exception
+ */
+ public void start() throws SchedulerException {
+ if (!scheduler.isStarted()) {
+ scheduler.start();
+ logger.info("Quartz service started");
}
- } else {
- jobDetail = newJob(clazz).withIdentity(jobKey).build();
+ }
- if (jobDataMap != null) {
- jobDetail.getJobDataMap().putAll(jobDataMap);
+ /**
+ * stop all scheduled tasks
+ * <p>
+ * Halts the Scheduler's firing of Triggers,
+ * and cleans up all resources associated with the Scheduler.
+ * <p>
+ * The scheduler cannot be re-started.
+ *
+ * @throws SchedulerException scheduler exception
+ */
+ public void shutdown() throws SchedulerException {
+ if (!scheduler.isShutdown()) {
+ // don't wait for the task to complete
+ scheduler.shutdown();
+ logger.info("Quartz service stopped, and halt all tasks");
}
+ }
- scheduler.addJob(jobDetail, false, true);
-
- logger.info("Add job, job name: {}, group name: {}",
- jobName, jobGroupName);
- }
-
- TriggerKey triggerKey = new TriggerKey(jobName, jobGroupName);
- /**
- * Instructs the Scheduler that upon a mis-fire
- * situation, the CronTrigger wants to have it's
- * next-fire-time updated to the next time in the schedule after the
- * current time (taking into account any associated Calendar),
- * but it does not want to be fired now.
- */
- CronTrigger cronTrigger =
newTrigger().withIdentity(triggerKey).startAt(startDate).endAt(endDate)
-
.withSchedule(cronSchedule(cronExpression).withMisfireHandlingInstructionDoNothing())
- .forJob(jobDetail).build();
-
- if (scheduler.checkExists(triggerKey)) {
- // updateProcessInstance scheduler trigger when scheduler cycle
changes
- CronTrigger oldCronTrigger = (CronTrigger)
scheduler.getTrigger(triggerKey);
- String oldCronExpression = oldCronTrigger.getCronExpression();
-
- if (!StringUtils.equalsIgnoreCase(cronExpression,oldCronExpression))
{
- // reschedule job trigger
- scheduler.rescheduleJob(triggerKey, cronTrigger);
- logger.info("reschedule job trigger, triggerName: {},
triggerGroupName: {}, cronExpression: {}, startDate: {}, endDate: {}",
- jobName, jobGroupName, cronExpression, startDate, endDate);
- }
- } else {
- scheduler.scheduleJob(cronTrigger);
- logger.info("schedule job trigger, triggerName: {}, triggerGroupName:
{}, cronExpression: {}, startDate: {}, endDate: {}",
- jobName, jobGroupName, cronExpression, startDate, endDate);
- }
-
- } catch (Exception e) {
- logger.error("add job failed", e);
- throw new RuntimeException("add job failed", e);
- } finally {
- lock.writeLock().unlock();
+ /**
+ * add task trigger , if this task already exists, return this task with
updated trigger
+ *
+ * @param clazz job class name
+ * @param jobName job name
+ * @param jobGroupName job group name
+ * @param startDate job start date
+ * @param endDate job end date
+ * @param cronExpression cron expression
+ * @param jobDataMap job parameters data map
+ */
+ public void addJob(Class<? extends Job> clazz, String jobName, String
jobGroupName, Date startDate, Date endDate,
+ String cronExpression,
+ Map<String, Object> jobDataMap) {
+ lock.writeLock().lock();
+ try {
+
+ JobKey jobKey = new JobKey(jobName, jobGroupName);
+ JobDetail jobDetail;
+ //add a task (if this task already exists, return this task
directly)
+ if (scheduler.checkExists(jobKey)) {
+
+ jobDetail = scheduler.getJobDetail(jobKey);
+ if (jobDataMap != null) {
+ jobDetail.getJobDataMap().putAll(jobDataMap);
+ }
+ } else {
+ jobDetail = newJob(clazz).withIdentity(jobKey).build();
+
+ if (jobDataMap != null) {
+ jobDetail.getJobDataMap().putAll(jobDataMap);
+ }
+
+ scheduler.addJob(jobDetail, false, true);
+
+ logger.info("Add job, job name: {}, group name: {}",
+ jobName, jobGroupName);
+ }
+
+ TriggerKey triggerKey = new TriggerKey(jobName, jobGroupName);
+ /**
+ * Instructs the Scheduler that upon a mis-fire
+ * situation, the CronTrigger wants to have it's
+ * next-fire-time updated to the next time in the schedule after
the
+ * current time (taking into account any associated Calendar),
+ * but it does not want to be fired now.
+ */
+ CronTrigger cronTrigger =
newTrigger().withIdentity(triggerKey).startAt(startDate).endAt(endDate)
+
.withSchedule(cronSchedule(cronExpression).withMisfireHandlingInstructionDoNothing())
+ .forJob(jobDetail).build();
+
+ if (scheduler.checkExists(triggerKey)) {
+ // updateProcessInstance scheduler trigger when scheduler
cycle changes
+ CronTrigger oldCronTrigger = (CronTrigger)
scheduler.getTrigger(triggerKey);
+ String oldCronExpression = oldCronTrigger.getCronExpression();
+
+ if (!StringUtils.equalsIgnoreCase(cronExpression,
oldCronExpression)) {
+ // reschedule job trigger
+ scheduler.rescheduleJob(triggerKey, cronTrigger);
+ logger.info("reschedule job trigger, triggerName: {},
triggerGroupName: {}, cronExpression: {}, startDate: {}, endDate: {}",
+ jobName, jobGroupName, cronExpression, startDate,
endDate);
+ }
+ } else {
+ scheduler.scheduleJob(cronTrigger);
+ logger.info("schedule job trigger, triggerName: {},
triggerGroupName: {}, cronExpression: {}, startDate: {}, endDate: {}",
+ jobName, jobGroupName, cronExpression, startDate,
endDate);
+ }
+
+ } catch (Exception e) {
+ throw new ServiceException("add job failed", e);
+ } finally {
+ lock.writeLock().unlock();
+ }
}
- }
-
-
- /**
- * delete job
- *
- * @param jobName job name
- * @param jobGroupName job group name
- * @return true if the Job was found and deleted.
- */
- public boolean deleteJob(String jobName, String jobGroupName) {
- lock.writeLock().lock();
- try {
- JobKey jobKey = new JobKey(jobName,jobGroupName);
- if(scheduler.checkExists(jobKey)){
- logger.info("try to delete job, job name: {}, job group name: {},",
jobName, jobGroupName);
- return scheduler.deleteJob(jobKey);
- }else {
- return true;
- }
-
- } catch (SchedulerException e) {
- logger.error("delete job : {} failed",jobName, e);
- } finally {
- lock.writeLock().unlock();
+
+ /**
+ * delete job
+ *
+ * @param jobName job name
+ * @param jobGroupName job group name
+ * @return true if the Job was found and deleted.
+ */
+ public boolean deleteJob(String jobName, String jobGroupName) {
+ lock.writeLock().lock();
+ try {
+ JobKey jobKey = new JobKey(jobName, jobGroupName);
+ if (scheduler.checkExists(jobKey)) {
+ logger.info("try to delete job, job name: {}, job group name:
{},", jobName, jobGroupName);
+ return scheduler.deleteJob(jobKey);
+ } else {
+ return true;
+ }
+
+ } catch (SchedulerException e) {
+ logger.error("delete job : {} failed", jobName, e);
+ } finally {
+ lock.writeLock().unlock();
+ }
+ return false;
+ }
+
+ /**
+ * delete all jobs in job group
+ *
+ * @param jobGroupName job group name
+ * @return true if all of the Jobs were found and deleted, false if
+ * one or more were not deleted.
+ */
+ public boolean deleteAllJobs(String jobGroupName) {
+ lock.writeLock().lock();
+ try {
+ logger.info("try to delete all jobs in job group: {}",
jobGroupName);
+ List<JobKey> jobKeys = new ArrayList<>();
+
jobKeys.addAll(scheduler.getJobKeys(GroupMatcher.groupEndsWith(jobGroupName)));
+
+ return scheduler.deleteJobs(jobKeys);
+ } catch (SchedulerException e) {
+ logger.error("delete all jobs in job group: {} failed",
jobGroupName, e);
+ } finally {
+ lock.writeLock().unlock();
+ }
+ return false;
}
- return false;
- }
-
- /**
- * delete all jobs in job group
- *
- * @param jobGroupName job group name
- *
- * @return true if all of the Jobs were found and deleted, false if
- * one or more were not deleted.
- */
- public boolean deleteAllJobs(String jobGroupName) {
- lock.writeLock().lock();
- try {
- logger.info("try to delete all jobs in job group: {}", jobGroupName);
- List<JobKey> jobKeys = new ArrayList<>();
-
jobKeys.addAll(scheduler.getJobKeys(GroupMatcher.groupEndsWith(jobGroupName)));
-
- return scheduler.deleteJobs(jobKeys);
- } catch (SchedulerException e) {
- logger.error("delete all jobs in job group: {} failed",jobGroupName, e);
- } finally {
- lock.writeLock().unlock();
+
+ /**
+ * build job name
+ *
+ * @param processId process id
+ * @return job name
+ */
+ public static String buildJobName(int processId) {
+ StringBuilder sb = new StringBuilder(30);
+ sb.append(QUARTZ_JOB_PRIFIX).append(UNDERLINE).append(processId);
+ return sb.toString();
+ }
+
+ /**
+ * build job group name
+ *
+ * @param projectId project id
+ * @return job group name
+ */
+ public static String buildJobGroupName(int projectId) {
+ StringBuilder sb = new StringBuilder(30);
+ sb.append(QUARTZ_JOB_GROUP_PRIFIX).append(UNDERLINE).append(projectId);
+ return sb.toString();
+ }
+
+ /**
+ * add params to map
+ *
+ * @param projectId project id
+ * @param scheduleId schedule id
+ * @param schedule schedule
+ * @return data map
+ */
+ public static Map<String, Object> buildDataMap(int projectId, int
scheduleId, Schedule schedule) {
+ Map<String, Object> dataMap = new HashMap<>(3);
+ dataMap.put(PROJECT_ID, projectId);
+ dataMap.put(SCHEDULE_ID, scheduleId);
+ dataMap.put(SCHEDULE, JSONUtils.toJsonString(schedule));
+
+ return dataMap;
}
- return false;
- }
-
- /**
- * build job name
- * @param processId process id
- * @return job name
- */
- public static String buildJobName(int processId) {
- StringBuilder sb = new StringBuilder(30);
- sb.append(QUARTZ_JOB_PRIFIX).append(UNDERLINE).append(processId);
- return sb.toString();
- }
-
- /**
- * build job group name
- * @param projectId project id
- * @return job group name
- */
- public static String buildJobGroupName(int projectId) {
- StringBuilder sb = new StringBuilder(30);
- sb.append(QUARTZ_JOB_GROUP_PRIFIX).append(UNDERLINE).append(projectId);
- return sb.toString();
- }
-
- /**
- * add params to map
- *
- * @param projectId project id
- * @param scheduleId schedule id
- * @param schedule schedule
- * @return data map
- */
- public static Map<String, Object> buildDataMap(int projectId, int
scheduleId, Schedule schedule) {
- Map<String, Object> dataMap = new HashMap<>(3);
- dataMap.put(PROJECT_ID, projectId);
- dataMap.put(SCHEDULE_ID, scheduleId);
- dataMap.put(SCHEDULE, JSONUtils.toJsonString(schedule));
-
- return dataMap;
- }
}
diff --git
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/quartz/cron/AbstractCycle.java
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/quartz/cron/AbstractCycle.java
index 0a2e31b..60c8623 100644
---
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/quartz/cron/AbstractCycle.java
+++
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/quartz/cron/AbstractCycle.java
@@ -14,159 +14,177 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.dolphinscheduler.service.quartz.cron;
+import org.apache.dolphinscheduler.common.enums.CycleEnum;
+
import com.cronutils.model.Cron;
import com.cronutils.model.field.CronField;
import com.cronutils.model.field.CronFieldName;
-import com.cronutils.model.field.expression.*;
-import org.apache.dolphinscheduler.common.enums.CycleEnum;
+import com.cronutils.model.field.expression.Always;
+import com.cronutils.model.field.expression.And;
+import com.cronutils.model.field.expression.Between;
+import com.cronutils.model.field.expression.Every;
+import com.cronutils.model.field.expression.FieldExpression;
+import com.cronutils.model.field.expression.On;
/**
* Cycle
*/
public abstract class AbstractCycle {
- protected Cron cron;
-
- protected CronField minField;
- protected CronField hourField;
- protected CronField dayOfMonthField;
- protected CronField dayOfWeekField;
- protected CronField monthField;
- protected CronField yearField;
-
- public CycleLinks addCycle(AbstractCycle cycle) {
- return new CycleLinks(this.cron).addCycle(this).addCycle(cycle);
- }
-
- /**
- * cycle constructor
- * @param cron cron
- */
- public AbstractCycle(Cron cron) {
- if (cron == null) {
- throw new IllegalArgumentException("cron must not be null!");
+ protected Cron cron;
+
+ protected CronField minField;
+ protected CronField hourField;
+ protected CronField dayOfMonthField;
+ protected CronField dayOfWeekField;
+ protected CronField monthField;
+ protected CronField yearField;
+
+ public CycleLinks addCycle(AbstractCycle cycle) {
+ return new CycleLinks(this.cron).addCycle(this).addCycle(cycle);
+ }
+
+ /**
+ * cycle constructor
+ *
+ * @param cron cron
+ */
+ protected AbstractCycle(Cron cron) {
+ if (cron == null) {
+ throw new IllegalArgumentException("cron must not be null!");
+ }
+
+ this.cron = cron;
+ this.minField = cron.retrieve(CronFieldName.MINUTE);
+ this.hourField = cron.retrieve(CronFieldName.HOUR);
+ this.dayOfMonthField = cron.retrieve(CronFieldName.DAY_OF_MONTH);
+ this.dayOfWeekField = cron.retrieve(CronFieldName.DAY_OF_WEEK);
+ this.monthField = cron.retrieve(CronFieldName.MONTH);
+ this.yearField = cron.retrieve(CronFieldName.YEAR);
+ }
+
+ /**
+ * whether the minute field has a value
+ *
+ * @return if minute field has a value return true,else return false
+ */
+ protected boolean minFiledIsSetAll() {
+ FieldExpression minFieldExpression = minField.getExpression();
+ return (minFieldExpression instanceof Every || minFieldExpression
instanceof Always
+ || minFieldExpression instanceof Between || minFieldExpression
instanceof And
+ || minFieldExpression instanceof On);
+ }
+
+ /**
+ * whether the minute field has a value of every or always
+ *
+ * @return if minute field has a value of every or always return true,else
return false
+ */
+ protected boolean minFiledIsEvery() {
+ FieldExpression minFieldExpression = minField.getExpression();
+ return (minFieldExpression instanceof Every || minFieldExpression
instanceof Always);
+ }
+
+ /**
+ * whether the hour field has a value
+ *
+ * @return if hour field has a value return true,else return false
+ */
+ protected boolean hourFiledIsSetAll() {
+ FieldExpression hourFieldExpression = hourField.getExpression();
+ return (hourFieldExpression instanceof Every || hourFieldExpression
instanceof Always
+ || hourFieldExpression instanceof Between ||
hourFieldExpression instanceof And
+ || hourFieldExpression instanceof On);
+ }
+
+ /**
+ * whether the hour field has a value of every or always
+ *
+ * @return if hour field has a value of every or always return true,else
return false
+ */
+ protected boolean hourFiledIsEvery() {
+ FieldExpression hourFieldExpression = hourField.getExpression();
+ return (hourFieldExpression instanceof Every || hourFieldExpression
instanceof Always);
+ }
+
+ /**
+ * whether the day Of month field has a value
+ *
+ * @return if day Of month field has a value return true,else return false
+ */
+ protected boolean dayOfMonthFieldIsSetAll() {
+ return (dayOfMonthField.getExpression() instanceof Every ||
dayOfMonthField.getExpression() instanceof Always
+ || dayOfMonthField.getExpression() instanceof Between ||
dayOfMonthField.getExpression() instanceof And
+ || dayOfMonthField.getExpression() instanceof On);
+ }
+
+ /**
+ * whether the day Of Month field has a value of every or always
+ *
+ * @return if day Of Month field has a value of every or always return
true,else return false
+ */
+ protected boolean dayOfMonthFieldIsEvery() {
+ return (dayOfMonthField.getExpression() instanceof Every ||
dayOfMonthField.getExpression() instanceof Always);
+ }
+
+ /**
+ * whether month field has a value
+ *
+ * @return if month field has a value return true,else return false
+ */
+ protected boolean monthFieldIsSetAll() {
+ FieldExpression monthFieldExpression = monthField.getExpression();
+ return (monthFieldExpression instanceof Every || monthFieldExpression
instanceof Always
+ || monthFieldExpression instanceof Between ||
monthFieldExpression instanceof And
+ || monthFieldExpression instanceof On);
+ }
+
+ /**
+ * whether the month field has a value of every or always
+ *
+ * @return if month field has a value of every or always return true,else
return false
+ */
+ protected boolean monthFieldIsEvery() {
+ FieldExpression monthFieldExpression = monthField.getExpression();
+ return (monthFieldExpression instanceof Every || monthFieldExpression
instanceof Always);
+ }
+
+ /**
+ * whether the day Of week field has a value
+ *
+ * @return if day Of week field has a value return true,else return false
+ */
+ protected boolean dayofWeekFieldIsSetAll() {
+ FieldExpression dayOfWeekFieldExpression =
dayOfWeekField.getExpression();
+ return (dayOfWeekFieldExpression instanceof Every ||
dayOfWeekFieldExpression instanceof Always
+ || dayOfWeekFieldExpression instanceof Between ||
dayOfWeekFieldExpression instanceof And
+ || dayOfWeekFieldExpression instanceof On);
+ }
+
+ /**
+ * whether the day Of week field has a value of every or always
+ *
+ * @return if day Of week field has a value of every or always return
true,else return false
+ */
+ protected boolean dayofWeekFieldIsEvery() {
+ FieldExpression dayOfWeekFieldExpression =
dayOfWeekField.getExpression();
+ return (dayOfWeekFieldExpression instanceof Every ||
dayOfWeekFieldExpression instanceof Always);
}
- this.cron = cron;
- this.minField = cron.retrieve(CronFieldName.MINUTE);
- this.hourField = cron.retrieve(CronFieldName.HOUR);
- this.dayOfMonthField = cron.retrieve(CronFieldName.DAY_OF_MONTH);
- this.dayOfWeekField = cron.retrieve(CronFieldName.DAY_OF_WEEK);
- this.monthField = cron.retrieve(CronFieldName.MONTH);
- this.yearField = cron.retrieve(CronFieldName.YEAR);
- }
-
- /**
- * whether the minute field has a value
- * @return if minute field has a value return true,else return false
- */
- protected boolean minFiledIsSetAll(){
- FieldExpression minFieldExpression = minField.getExpression();
- return (minFieldExpression instanceof Every || minFieldExpression
instanceof Always
- || minFieldExpression instanceof Between || minFieldExpression
instanceof And
- || minFieldExpression instanceof On);
- }
-
-
- /**
- * whether the minute field has a value of every or always
- * @return if minute field has a value of every or always return true,else
return false
- */
- protected boolean minFiledIsEvery(){
- FieldExpression minFieldExpression = minField.getExpression();
- return (minFieldExpression instanceof Every || minFieldExpression
instanceof Always);
- }
-
- /**
- * whether the hour field has a value
- * @return if hour field has a value return true,else return false
- */
- protected boolean hourFiledIsSetAll(){
- FieldExpression hourFieldExpression = hourField.getExpression();
- return (hourFieldExpression instanceof Every || hourFieldExpression
instanceof Always
- || hourFieldExpression instanceof Between || hourFieldExpression
instanceof And
- || hourFieldExpression instanceof On);
- }
-
- /**
- * whether the hour field has a value of every or always
- * @return if hour field has a value of every or always return true,else
return false
- */
- protected boolean hourFiledIsEvery(){
- FieldExpression hourFieldExpression = hourField.getExpression();
- return (hourFieldExpression instanceof Every || hourFieldExpression
instanceof Always);
- }
-
- /**
- * whether the day Of month field has a value
- * @return if day Of month field has a value return true,else return false
- */
- protected boolean dayOfMonthFieldIsSetAll(){
- return (dayOfMonthField.getExpression() instanceof Every ||
dayOfMonthField.getExpression() instanceof Always
- || dayOfMonthField.getExpression() instanceof Between ||
dayOfMonthField.getExpression() instanceof And
- || dayOfMonthField.getExpression() instanceof On);
- }
-
-
- /**
- * whether the day Of Month field has a value of every or always
- * @return if day Of Month field has a value of every or always return
true,else return false
- */
- protected boolean dayOfMonthFieldIsEvery(){
- return (dayOfMonthField.getExpression() instanceof Every ||
dayOfMonthField.getExpression() instanceof Always);
- }
-
- /**
- * whether month field has a value
- * @return if month field has a value return true,else return false
- */
- protected boolean monthFieldIsSetAll(){
- FieldExpression monthFieldExpression = monthField.getExpression();
- return (monthFieldExpression instanceof Every || monthFieldExpression
instanceof Always
- || monthFieldExpression instanceof Between || monthFieldExpression
instanceof And
- || monthFieldExpression instanceof On);
- }
-
- /**
- * whether the month field has a value of every or always
- * @return if month field has a value of every or always return true,else
return false
- */
- protected boolean monthFieldIsEvery(){
- FieldExpression monthFieldExpression = monthField.getExpression();
- return (monthFieldExpression instanceof Every || monthFieldExpression
instanceof Always);
- }
-
- /**
- * whether the day Of week field has a value
- * @return if day Of week field has a value return true,else return false
- */
- protected boolean dayofWeekFieldIsSetAll(){
- FieldExpression dayOfWeekFieldExpression = dayOfWeekField.getExpression();
- return (dayOfWeekFieldExpression instanceof Every ||
dayOfWeekFieldExpression instanceof Always
- || dayOfWeekFieldExpression instanceof Between ||
dayOfWeekFieldExpression instanceof And
- || dayOfWeekFieldExpression instanceof On);
- }
-
- /**
- * whether the day Of week field has a value of every or always
- * @return if day Of week field has a value of every or always return
true,else return false
- */
- protected boolean dayofWeekFieldIsEvery(){
- FieldExpression dayOfWeekFieldExpression = dayOfWeekField.getExpression();
- return (dayOfWeekFieldExpression instanceof Every ||
dayOfWeekFieldExpression instanceof Always);
- }
-
- /**
- * get cycle enum
- * @return CycleEnum
- */
- protected abstract CycleEnum getCycle();
-
- /**
- * get mini level cycle enum
- * @return CycleEnum
- */
- protected abstract CycleEnum getMiniCycle();
+ /**
+ * get cycle enum
+ *
+ * @return CycleEnum
+ */
+ protected abstract CycleEnum getCycle();
+
+ /**
+ * get mini level cycle enum
+ *
+ * @return CycleEnum
+ */
+ protected abstract CycleEnum getMiniCycle();
}
diff --git
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/AbstractZKClient.java
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/AbstractZKClient.java
index 8a7d891..37d8f10 100644
---
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/AbstractZKClient.java
+++
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/AbstractZKClient.java
@@ -14,322 +14,329 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.dolphinscheduler.service.zk;
-import org.apache.curator.framework.recipes.locks.InterProcessMutex;
+import static org.apache.dolphinscheduler.common.Constants.ADD_ZK_OP;
+import static org.apache.dolphinscheduler.common.Constants.COLON;
+import static org.apache.dolphinscheduler.common.Constants.DELETE_ZK_OP;
+import static org.apache.dolphinscheduler.common.Constants.DIVISION_STRING;
+import static org.apache.dolphinscheduler.common.Constants.MASTER_PREFIX;
+import static org.apache.dolphinscheduler.common.Constants.SINGLE_SLASH;
+import static org.apache.dolphinscheduler.common.Constants.UNDERLINE;
+import static org.apache.dolphinscheduler.common.Constants.WORKER_PREFIX;
+
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.ZKNodeType;
import org.apache.dolphinscheduler.common.model.Server;
import org.apache.dolphinscheduler.common.utils.ResInfo;
import org.apache.dolphinscheduler.common.utils.StringUtils;
+
+import org.apache.curator.framework.recipes.locks.InterProcessMutex;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
-import java.util.*;
-
-import static org.apache.dolphinscheduler.common.Constants.*;
-
/**
* abstract zookeeper client
*/
@Component
public abstract class AbstractZKClient extends ZookeeperCachedOperator {
- private static final Logger logger =
LoggerFactory.getLogger(AbstractZKClient.class);
-
-
- /**
- * remove dead server by host
- * @param host host
- * @param serverType serverType
- * @throws Exception
- */
- public void removeDeadServerByHost(String host, String serverType)
throws Exception {
- List<String> deadServers =
super.getChildrenKeys(getDeadZNodeParentPath());
- for(String serverPath : deadServers){
- if(serverPath.startsWith(serverType+UNDERLINE+host)){
- String server = getDeadZNodeParentPath() +
SINGLE_SLASH + serverPath;
- super.remove(server);
- logger.info("{} server {} deleted from zk dead
server path success" , serverType , host);
- }
- }
- }
-
-
- /**
- * opType(add): if find dead server , then add to zk deadServerPath
- * opType(delete): delete path from zk
- *
- * @param zNode node path
- * @param zkNodeType master or worker
- * @param opType delete or add
- * @throws Exception errors
- */
- public void handleDeadServer(String zNode, ZKNodeType zkNodeType,
String opType) throws Exception {
- String host = getHostByEventDataPath(zNode);
- String type = (zkNodeType == ZKNodeType.MASTER) ? MASTER_PREFIX
: WORKER_PREFIX;
-
- //check server restart, if restart , dead server path in zk
should be delete
- if(opType.equals(DELETE_ZK_OP)){
- removeDeadServerByHost(host, type);
-
- }else if(opType.equals(ADD_ZK_OP)){
- String deadServerPath = getDeadZNodeParentPath() +
SINGLE_SLASH + type + UNDERLINE + host;
- if(!super.isExisted(deadServerPath)){
- //add dead server info to zk dead server path :
/dead-servers/
-
- super.persist(deadServerPath,(type + UNDERLINE
+ host));
-
- logger.info("{} server dead , and {} added to
zk dead server path success" ,
- zkNodeType.toString(), zNode);
- }
- }
-
- }
-
- /**
- * get active master num
- * @return active master number
- */
- public int getActiveMasterNum(){
- List<String> childrenList = new ArrayList<>();
- try {
- // read master node parent path from conf
-
if(super.isExisted(getZNodeParentPath(ZKNodeType.MASTER))){
- childrenList =
super.getChildrenKeys(getZNodeParentPath(ZKNodeType.MASTER));
- }
- } catch (Exception e) {
- logger.error("getActiveMasterNum error",e);
- }
- return childrenList.size();
- }
-
- /**
- *
- * @return zookeeper quorum
- */
- public String getZookeeperQuorum(){
- return getZookeeperConfig().getServerList();
- }
-
- /**
- * get server list.
- * @param zkNodeType zookeeper node type
- * @return server list
- */
- public List<Server> getServersList(ZKNodeType zkNodeType){
- Map<String, String> masterMap = getServerMaps(zkNodeType);
- String parentPath = getZNodeParentPath(zkNodeType);
-
- List<Server> masterServers = new ArrayList<>();
- for (Map.Entry<String, String> entry : masterMap.entrySet()) {
- Server masterServer =
ResInfo.parseHeartbeatForZKInfo(entry.getValue());
- if(masterServer == null){
- continue;
- }
- String key = entry.getKey();
- masterServer.setZkDirectory(parentPath + "/"+ key);
- //set host and port
- String[] hostAndPort=key.split(COLON);
- String[] hosts=hostAndPort[0].split(DIVISION_STRING);
- // fetch the last one
- masterServer.setHost(hosts[hosts.length-1]);
- masterServer.setPort(Integer.parseInt(hostAndPort[1]));
- masterServers.add(masterServer);
- }
- return masterServers;
- }
-
- /**
- * get master server list map.
- * @param zkNodeType zookeeper node type
- * @return result : {host : resource info}
- */
- public Map<String, String> getServerMaps(ZKNodeType zkNodeType){
-
- Map<String, String> masterMap = new HashMap<>();
- try {
- String path = getZNodeParentPath(zkNodeType);
- List<String> serverList = super.getChildrenKeys(path);
- if(zkNodeType == ZKNodeType.WORKER){
- List<String> workerList = new ArrayList<>();
- for(String group : serverList){
- List<String> groupServers =
super.getChildrenKeys(path + Constants.SLASH + group);
- for(String groupServer : groupServers){
- workerList.add(group + Constants.SLASH
+ groupServer);
- }
- }
- serverList = workerList;
- }
- for(String server : serverList){
- masterMap.putIfAbsent(server, super.get(path +
Constants.SLASH + server));
- }
- } catch (Exception e) {
- logger.error("get server list failed", e);
- }
-
- return masterMap;
- }
-
- /**
- * check the zookeeper node already exists
- * @param host host
- * @param zkNodeType zookeeper node type
- * @return true if exists
- */
- public boolean checkZKNodeExists(String host, ZKNodeType zkNodeType) {
- String path = getZNodeParentPath(zkNodeType);
- if(StringUtils.isEmpty(path)){
- logger.error("check zk node exists error, host:{}, zk
node type:{}",
- host, zkNodeType.toString());
- return false;
- }
- Map<String, String> serverMaps = getServerMaps(zkNodeType);
- for(String hostKey : serverMaps.keySet()){
- if(hostKey.contains(host)){
- return true;
- }
- }
- return false;
- }
-
- /**
- *
- * @return get worker node parent path
- */
- protected String getWorkerZNodeParentPath(){
- return getZookeeperConfig().getDsRoot() +
Constants.ZOOKEEPER_DOLPHINSCHEDULER_WORKERS;
- }
-
- /**
- *
- * @return get master node parent path
- */
- protected String getMasterZNodeParentPath(){
- return getZookeeperConfig().getDsRoot() +
Constants.ZOOKEEPER_DOLPHINSCHEDULER_MASTERS;
- }
-
- /**
- *
- * @return get master lock path
- */
- public String getMasterLockPath(){
- return getZookeeperConfig().getDsRoot() +
Constants.ZOOKEEPER_DOLPHINSCHEDULER_LOCK_MASTERS;
- }
-
- /**
- *
- * @param zkNodeType zookeeper node type
- * @return get zookeeper node parent path
- */
- public String getZNodeParentPath(ZKNodeType zkNodeType) {
- String path = "";
- switch (zkNodeType){
- case MASTER:
- return getMasterZNodeParentPath();
- case WORKER:
- return getWorkerZNodeParentPath();
- case DEAD_SERVER:
- return getDeadZNodeParentPath();
- default:
- break;
- }
- return path;
- }
-
- /**
- *
- * @return get dead server node parent path
- */
- protected String getDeadZNodeParentPath(){
- return getZookeeperConfig().getDsRoot() +
Constants.ZOOKEEPER_DOLPHINSCHEDULER_DEAD_SERVERS;
- }
-
- /**
- *
- * @return get master start up lock path
- */
- public String getMasterStartUpLockPath(){
- return getZookeeperConfig().getDsRoot() +
Constants.ZOOKEEPER_DOLPHINSCHEDULER_LOCK_FAILOVER_STARTUP_MASTERS;
- }
-
- /**
- *
- * @return get master failover lock path
- */
- public String getMasterFailoverLockPath(){
- return getZookeeperConfig().getDsRoot() +
Constants.ZOOKEEPER_DOLPHINSCHEDULER_LOCK_FAILOVER_MASTERS;
- }
-
- /**
- *
- * @return get worker failover lock path
- */
- public String getWorkerFailoverLockPath(){
- return getZookeeperConfig().getDsRoot() +
Constants.ZOOKEEPER_DOLPHINSCHEDULER_LOCK_FAILOVER_WORKERS;
- }
-
- /**
- * release mutex
- * @param mutex mutex
- */
- public void releaseMutex(InterProcessMutex mutex) {
- if (mutex != null){
- try {
- mutex.release();
- } catch (Exception e) {
- if("instance must be started before calling
this method".equals(e.getMessage())){
- logger.warn("lock release");
- }else{
- logger.error("lock release failed",e);
- }
-
- }
- }
- }
-
- /**
- * init system znode
- */
- protected void initSystemZNode(){
- try {
- persist(getMasterZNodeParentPath(), "");
- persist(getWorkerZNodeParentPath(), "");
- persist(getDeadZNodeParentPath(), "");
-
- logger.info("initialize server nodes success.");
- } catch (Exception e) {
- logger.error("init system znode failed",e);
- }
- }
-
- /**
- * get host ip, string format: masterParentPath/ip
- * @param path path
- * @return host ip, string format: masterParentPath/ip
- */
- protected String getHostByEventDataPath(String path) {
- if(StringUtils.isEmpty(path)){
- logger.error("empty path!");
- return "";
- }
- String[] pathArray = path.split(SINGLE_SLASH);
- if(pathArray.length < 1){
- logger.error("parse ip error: {}", path);
- return "";
- }
- return pathArray[pathArray.length - 1];
-
- }
-
- @Override
- public String toString() {
- return "AbstractZKClient{" +
- "zkClient=" + getZkClient() +
- ", deadServerZNodeParentPath='" +
getZNodeParentPath(ZKNodeType.DEAD_SERVER) + '\'' +
- ", masterZNodeParentPath='" +
getZNodeParentPath(ZKNodeType.MASTER) + '\'' +
- ", workerZNodeParentPath='" +
getZNodeParentPath(ZKNodeType.WORKER) + '\'' +
- '}';
- }
-}
\ No newline at end of file
+ private static final Logger logger =
LoggerFactory.getLogger(AbstractZKClient.class);
+
+ /**
+ * remove dead server by host
+ *
+ * @param host host
+ * @param serverType serverType
+ */
+ public void removeDeadServerByHost(String host, String serverType) {
+ List<String> deadServers =
super.getChildrenKeys(getDeadZNodeParentPath());
+ for (String serverPath : deadServers) {
+ if (serverPath.startsWith(serverType + UNDERLINE + host)) {
+ String server = getDeadZNodeParentPath() + SINGLE_SLASH +
serverPath;
+ super.remove(server);
+ logger.info("{} server {} deleted from zk dead server path
success", serverType, host);
+ }
+ }
+ }
+
+ /**
+ * opType(add): if find dead server , then add to zk deadServerPath
+ * opType(delete): delete path from zk
+ *
+ * @param zNode node path
+ * @param zkNodeType master or worker
+ * @param opType delete or add
+ */
+ public void handleDeadServer(String zNode, ZKNodeType zkNodeType, String
opType) {
+ String host = getHostByEventDataPath(zNode);
+ String type = (zkNodeType == ZKNodeType.MASTER) ? MASTER_PREFIX :
WORKER_PREFIX;
+
+ //check server restart, if restart , dead server path in zk should be
delete
+ if (opType.equals(DELETE_ZK_OP)) {
+ removeDeadServerByHost(host, type);
+
+ } else if (opType.equals(ADD_ZK_OP)) {
+ String deadServerPath = getDeadZNodeParentPath() + SINGLE_SLASH +
type + UNDERLINE + host;
+ if (!super.isExisted(deadServerPath)) {
+ //add dead server info to zk dead server path : /dead-servers/
+
+ super.persist(deadServerPath, (type + UNDERLINE + host));
+
+ logger.info("{} server dead , and {} added to zk dead server
path success",
+ zkNodeType, zNode);
+ }
+ }
+
+ }
+
+ /**
+ * get active master num
+ *
+ * @return active master number
+ */
+ public int getActiveMasterNum() {
+ List<String> childrenList = new ArrayList<>();
+ try {
+ // read master node parent path from conf
+ if (super.isExisted(getZNodeParentPath(ZKNodeType.MASTER))) {
+ childrenList =
super.getChildrenKeys(getZNodeParentPath(ZKNodeType.MASTER));
+ }
+ } catch (Exception e) {
+ logger.error("getActiveMasterNum error", e);
+ }
+ return childrenList.size();
+ }
+
+ /**
+ * @return zookeeper quorum
+ */
+ public String getZookeeperQuorum() {
+ return getZookeeperConfig().getServerList();
+ }
+
+ /**
+ * get server list.
+ *
+ * @param zkNodeType zookeeper node type
+ * @return server list
+ */
+ public List<Server> getServersList(ZKNodeType zkNodeType) {
+ Map<String, String> masterMap = getServerMaps(zkNodeType);
+ String parentPath = getZNodeParentPath(zkNodeType);
+
+ List<Server> masterServers = new ArrayList<>();
+ for (Map.Entry<String, String> entry : masterMap.entrySet()) {
+ Server masterServer =
ResInfo.parseHeartbeatForZKInfo(entry.getValue());
+ if (masterServer == null) {
+ continue;
+ }
+ String key = entry.getKey();
+ masterServer.setZkDirectory(parentPath + "/" + key);
+ //set host and port
+ String[] hostAndPort = key.split(COLON);
+ String[] hosts = hostAndPort[0].split(DIVISION_STRING);
+ // fetch the last one
+ masterServer.setHost(hosts[hosts.length - 1]);
+ masterServer.setPort(Integer.parseInt(hostAndPort[1]));
+ masterServers.add(masterServer);
+ }
+ return masterServers;
+ }
+
+ /**
+ * get master server list map.
+ *
+ * @param zkNodeType zookeeper node type
+ * @return result : {host : resource info}
+ */
+ public Map<String, String> getServerMaps(ZKNodeType zkNodeType) {
+
+ Map<String, String> masterMap = new HashMap<>();
+ try {
+ String path = getZNodeParentPath(zkNodeType);
+ List<String> serverList = super.getChildrenKeys(path);
+ if (zkNodeType == ZKNodeType.WORKER) {
+ List<String> workerList = new ArrayList<>();
+ for (String group : serverList) {
+ List<String> groupServers = super.getChildrenKeys(path +
Constants.SLASH + group);
+ for (String groupServer : groupServers) {
+ workerList.add(group + Constants.SLASH + groupServer);
+ }
+ }
+ serverList = workerList;
+ }
+ for (String server : serverList) {
+ masterMap.putIfAbsent(server, super.get(path + Constants.SLASH
+ server));
+ }
+ } catch (Exception e) {
+ logger.error("get server list failed", e);
+ }
+
+ return masterMap;
+ }
+
+ /**
+ * check the zookeeper node already exists
+ *
+ * @param host host
+ * @param zkNodeType zookeeper node type
+ * @return true if exists
+ */
+ public boolean checkZKNodeExists(String host, ZKNodeType zkNodeType) {
+ String path = getZNodeParentPath(zkNodeType);
+ if (StringUtils.isEmpty(path)) {
+ logger.error("check zk node exists error, host:{}, zk node
type:{}",
+ host, zkNodeType);
+ return false;
+ }
+ Map<String, String> serverMaps = getServerMaps(zkNodeType);
+ for (String hostKey : serverMaps.keySet()) {
+ if (hostKey.contains(host)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ /**
+ * @return get worker node parent path
+ */
+ protected String getWorkerZNodeParentPath() {
+ return getZookeeperConfig().getDsRoot() +
Constants.ZOOKEEPER_DOLPHINSCHEDULER_WORKERS;
+ }
+
+ /**
+ * @return get master node parent path
+ */
+ protected String getMasterZNodeParentPath() {
+ return getZookeeperConfig().getDsRoot() +
Constants.ZOOKEEPER_DOLPHINSCHEDULER_MASTERS;
+ }
+
+ /**
+ * @return get master lock path
+ */
+ public String getMasterLockPath() {
+ return getZookeeperConfig().getDsRoot() +
Constants.ZOOKEEPER_DOLPHINSCHEDULER_LOCK_MASTERS;
+ }
+
+ /**
+ * @param zkNodeType zookeeper node type
+ * @return get zookeeper node parent path
+ */
+ public String getZNodeParentPath(ZKNodeType zkNodeType) {
+ String path = "";
+ switch (zkNodeType) {
+ case MASTER:
+ return getMasterZNodeParentPath();
+ case WORKER:
+ return getWorkerZNodeParentPath();
+ case DEAD_SERVER:
+ return getDeadZNodeParentPath();
+ default:
+ break;
+ }
+ return path;
+ }
+
+ /**
+ * @return get dead server node parent path
+ */
+ protected String getDeadZNodeParentPath() {
+ return getZookeeperConfig().getDsRoot() +
Constants.ZOOKEEPER_DOLPHINSCHEDULER_DEAD_SERVERS;
+ }
+
+ /**
+ * @return get master start up lock path
+ */
+ public String getMasterStartUpLockPath() {
+ return getZookeeperConfig().getDsRoot() +
Constants.ZOOKEEPER_DOLPHINSCHEDULER_LOCK_FAILOVER_STARTUP_MASTERS;
+ }
+
+ /**
+ * @return get master failover lock path
+ */
+ public String getMasterFailoverLockPath() {
+ return getZookeeperConfig().getDsRoot() +
Constants.ZOOKEEPER_DOLPHINSCHEDULER_LOCK_FAILOVER_MASTERS;
+ }
+
+ /**
+ * @return get worker failover lock path
+ */
+ public String getWorkerFailoverLockPath() {
+ return getZookeeperConfig().getDsRoot() +
Constants.ZOOKEEPER_DOLPHINSCHEDULER_LOCK_FAILOVER_WORKERS;
+ }
+
+ /**
+ * release mutex
+ *
+ * @param mutex mutex
+ */
+ public void releaseMutex(InterProcessMutex mutex) {
+ if (mutex != null) {
+ try {
+ mutex.release();
+ } catch (Exception e) {
+ if ("instance must be started before calling this
method".equals(e.getMessage())) {
+ logger.warn("lock release");
+ } else {
+ logger.error("lock release failed", e);
+ }
+
+ }
+ }
+ }
+
+ /**
+ * init system znode
+ */
+ protected void initSystemZNode() {
+ try {
+ persist(getMasterZNodeParentPath(), "");
+ persist(getWorkerZNodeParentPath(), "");
+ persist(getDeadZNodeParentPath(), "");
+
+ logger.info("initialize server nodes success.");
+ } catch (Exception e) {
+ logger.error("init system znode failed", e);
+ }
+ }
+
+ /**
+ * get host ip, string format: masterParentPath/ip
+ *
+ * @param path path
+ * @return host ip, string format: masterParentPath/ip
+ */
+ protected String getHostByEventDataPath(String path) {
+ if (StringUtils.isEmpty(path)) {
+ logger.error("empty path!");
+ return "";
+ }
+ String[] pathArray = path.split(SINGLE_SLASH);
+ if (pathArray.length < 1) {
+ logger.error("parse ip error: {}", path);
+ return "";
+ }
+ return pathArray[pathArray.length - 1];
+
+ }
+
+ @Override
+ public String toString() {
+ return "AbstractZKClient{"
+ + "zkClient=" + getZkClient()
+ + ", deadServerZNodeParentPath='" +
getZNodeParentPath(ZKNodeType.DEAD_SERVER) + '\''
+ + ", masterZNodeParentPath='" +
getZNodeParentPath(ZKNodeType.MASTER) + '\''
+ + ", workerZNodeParentPath='" +
getZNodeParentPath(ZKNodeType.WORKER) + '\''
+ + '}';
+ }
+}
diff --git
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/CuratorZookeeperClient.java
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/CuratorZookeeperClient.java
index 5a04c5a..e25a22f 100644
---
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/CuratorZookeeperClient.java
+++
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/CuratorZookeeperClient.java
@@ -14,9 +14,14 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.dolphinscheduler.service.zk;
-import org.apache.commons.lang.StringUtils;
+import static
org.apache.dolphinscheduler.common.utils.Preconditions.checkNotNull;
+
+import org.apache.dolphinscheduler.common.utils.StringUtils;
+import org.apache.dolphinscheduler.service.exceptions.ServiceException;
+
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.ACLProvider;
@@ -25,18 +30,16 @@ import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.data.ACL;
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
-import java.nio.charset.StandardCharsets;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-
-import static
org.apache.dolphinscheduler.common.utils.Preconditions.checkNotNull;
-
/**
* Shared Curator zookeeper client
*/
@@ -49,7 +52,6 @@ public class CuratorZookeeperClient implements
InitializingBean {
private CuratorFramework zkClient;
-
@Override
public void afterPropertiesSet() throws Exception {
this.zkClient = buildClient();
@@ -91,7 +93,7 @@ public class CuratorZookeeperClient implements
InitializingBean {
zkClient.blockUntilConnected(30, TimeUnit.SECONDS);
} catch (final Exception ex) {
- throw new RuntimeException(ex);
+ throw new ServiceException(ex);
}
return zkClient;
}
@@ -123,4 +125,4 @@ public class CuratorZookeeperClient implements
InitializingBean {
public CuratorFramework getZkClient() {
return zkClient;
}
-}
\ No newline at end of file
+}
diff --git
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/ZKServer.java
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/ZKServer.java
index c7a53eb..7ac23a3 100644
---
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/ZKServer.java
+++
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/ZKServer.java
@@ -14,19 +14,22 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.dolphinscheduler.service.zk;
import org.apache.dolphinscheduler.common.utils.StringUtils;
+import org.apache.dolphinscheduler.service.exceptions.ServiceException;
+
import org.apache.zookeeper.server.ZooKeeperServer;
import org.apache.zookeeper.server.ZooKeeperServerMain;
import org.apache.zookeeper.server.quorum.QuorumPeerConfig;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicBoolean;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* just speed experience version
@@ -51,10 +54,10 @@ public class ZKServer {
ZKServer zkServer;
if (args.length == 0) {
zkServer = new ZKServer();
- } else if (args.length == 1){
- zkServer = new ZKServer(Integer.valueOf(args[0]), "");
+ } else if (args.length == 1) {
+ zkServer = new ZKServer(Integer.parseInt(args[0]), "");
} else {
- zkServer = new ZKServer(Integer.valueOf(args[0]), args[1]);
+ zkServer = new ZKServer(Integer.parseInt(args[0]), args[1]);
}
zkServer.registerHook();
zkServer.start();
@@ -73,7 +76,7 @@ public class ZKServer {
}
private void registerHook() {
- /**
+ /*
* register hooks, which are called before the process exits
*/
Runtime.getRuntime().addShutdownHook(new Thread(this::stop));
@@ -90,7 +93,7 @@ public class ZKServer {
}
}
- public boolean isStarted(){
+ public boolean isStarted() {
return isStarted.get();
}
@@ -119,19 +122,19 @@ public class ZKServer {
if (file.exists()) {
logger.warn("The path of zk server exists");
}
- logger.info("zk server starting, data dir path:{}" , zkDataDir);
- startLocalZkServer(port, zkDataDir,
ZooKeeperServer.DEFAULT_TICK_TIME,"60");
+ logger.info("zk server starting, data dir path:{}", zkDataDir);
+ startLocalZkServer(port, zkDataDir, ZooKeeperServer.DEFAULT_TICK_TIME,
"60");
}
/**
* Starts a local Zk instance
*
- * @param port The port to listen on
+ * @param port The port to listen on
* @param dataDirPath The path for the Zk data directory
- * @param tickTime zk tick time
- * @param maxClientCnxns zk max client connections
+ * @param tickTime zk tick time
+ * @param maxClientCnxns zk max client connections
*/
- private void startLocalZkServer(final int port, final String
dataDirPath,final int tickTime,String maxClientCnxns) {
+ private void startLocalZkServer(final int port, final String dataDirPath,
final int tickTime, String maxClientCnxns) {
if (isStarted.compareAndSet(false, true)) {
zooKeeperServerMain = new PublicZooKeeperServerMain();
logger.info("Zookeeper data path : {} ", dataDirPath);
@@ -144,8 +147,7 @@ public class ZKServer {
zooKeeperServerMain.initializeAndRun(args);
} catch (QuorumPeerConfig.ConfigException | IOException e) {
- logger.warn("Caught exception while starting ZK", e);
- throw new RuntimeException(e);
+ throw new ServiceException("Caught exception while starting
ZK", e);
}
}
}
@@ -159,7 +161,7 @@ public class ZKServer {
logger.info("zk server stopped");
} catch (Exception e) {
- logger.error("Failed to stop ZK ",e);
+ logger.error("Failed to stop ZK ", e);
}
}
@@ -180,8 +182,7 @@ public class ZKServer {
org.apache.commons.io.FileUtils.deleteDirectory(new
File(dataDir));
}
} catch (Exception e) {
- logger.warn("Caught exception while stopping ZK server", e);
- throw new RuntimeException(e);
+ throw new ServiceException("Caught exception while starting
ZK", e);
}
}
}
diff --git
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/ZookeeperCachedOperator.java
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/ZookeeperCachedOperator.java
index 6dfce79..88c339b 100644
---
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/ZookeeperCachedOperator.java
+++
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/ZookeeperCachedOperator.java
@@ -14,21 +14,24 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.dolphinscheduler.service.zk;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
+import org.apache.dolphinscheduler.service.exceptions.ServiceException;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.ChildData;
import org.apache.curator.framework.recipes.cache.TreeCache;
import org.apache.curator.framework.recipes.cache.TreeCacheEvent;
import org.apache.curator.framework.recipes.cache.TreeCacheListener;
+
+import java.nio.charset.StandardCharsets;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
-import java.nio.charset.StandardCharsets;
-
@Component
public class ZookeeperCachedOperator extends ZookeeperOperator {
@@ -36,6 +39,7 @@ public class ZookeeperCachedOperator extends
ZookeeperOperator {
private TreeCache treeCache;
+
/**
* register a unified listener of /${dsRoot},
*/
@@ -59,14 +63,16 @@ public class ZookeeperCachedOperator extends
ZookeeperOperator {
treeCache.start();
} catch (Exception e) {
logger.error("add listener to zk path: {} failed",
getZookeeperConfig().getDsRoot());
- throw new RuntimeException(e);
+ throw new ServiceException(e);
}
}
//for sub class
- protected void dataChanged(final CuratorFramework client, final
TreeCacheEvent event, final String path){}
+ protected void dataChanged(final CuratorFramework client, final
TreeCacheEvent event, final String path) {
+ // Used by sub class
+ }
- public String getFromCache(final String cachePath, final String key) {
+ public String getFromCache(final String key) {
ChildData resultInCache = treeCache.getCurrentData(key);
if (null != resultInCache) {
return null == resultInCache.getData() ? null : new
String(resultInCache.getData(), StandardCharsets.UTF_8);
@@ -74,11 +80,11 @@ public class ZookeeperCachedOperator extends
ZookeeperOperator {
return null;
}
- public TreeCache getTreeCache(final String cachePath) {
+ public TreeCache getTreeCache() {
return treeCache;
}
- public void addListener(TreeCacheListener listener){
+ public void addListener(TreeCacheListener listener) {
this.treeCache.getListenable().addListener(listener);
}
diff --git
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/ZookeeperOperator.java
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/ZookeeperOperator.java
index e7b049f..8a21983 100644
---
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/ZookeeperOperator.java
+++
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/ZookeeperOperator.java
@@ -14,13 +14,17 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.dolphinscheduler.service.zk;
-import org.apache.commons.lang.StringUtils;
+import static
org.apache.dolphinscheduler.common.utils.Preconditions.checkNotNull;
+
+import org.apache.dolphinscheduler.common.utils.StringUtils;
+import org.apache.dolphinscheduler.service.exceptions.ServiceException;
+
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.ACLProvider;
-import org.apache.curator.framework.api.transaction.CuratorOp;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.utils.CloseableUtils;
@@ -29,18 +33,16 @@ import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Stat;
+
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
-import java.nio.charset.StandardCharsets;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-
-import static
org.apache.dolphinscheduler.common.utils.Preconditions.checkNotNull;
-
/**
* zk base operator
*/
@@ -64,19 +66,23 @@ public class ZookeeperOperator implements InitializingBean {
/**
* this method is for sub class,
*/
- protected void registerListener(){}
+ protected void registerListener() {
+ // Used by sub class
+ }
- protected void treeCacheStart(){}
+ protected void treeCacheStart() {
+ // Used by sub class
+ }
public void initStateLister() {
checkNotNull(zkClient);
zkClient.getConnectionStateListenable().addListener((client, newState)
-> {
- if(newState == ConnectionState.LOST){
+ if (newState == ConnectionState.LOST) {
logger.error("connection lost from zookeeper");
- } else if(newState == ConnectionState.RECONNECTED){
+ } else if (newState == ConnectionState.RECONNECTED) {
logger.info("reconnected to zookeeper");
- } else if(newState == ConnectionState.SUSPENDED){
+ } else if (newState == ConnectionState.SUSPENDED) {
logger.warn("connection SUSPENDED to zookeeper");
}
});
@@ -85,7 +91,8 @@ public class ZookeeperOperator implements InitializingBean {
private CuratorFramework buildClient() {
logger.info("zookeeper registry center init, server lists is: {}.",
zookeeperConfig.getServerList());
- CuratorFrameworkFactory.Builder builder =
CuratorFrameworkFactory.builder().ensembleProvider(new
DefaultEnsembleProvider(checkNotNull(zookeeperConfig.getServerList(),"zookeeper
quorum can't be null")))
+ CuratorFrameworkFactory.Builder builder =
CuratorFrameworkFactory.builder().ensembleProvider(new
DefaultEnsembleProvider(checkNotNull(zookeeperConfig.getServerList(),
+ "zookeeper quorum can't be null")))
.retryPolicy(new
ExponentialBackoffRetry(zookeeperConfig.getBaseSleepTimeMs(),
zookeeperConfig.getMaxRetries(), zookeeperConfig.getMaxSleepMs()));
//these has default value
@@ -114,7 +121,7 @@ public class ZookeeperOperator implements InitializingBean {
try {
zkClient.blockUntilConnected();
} catch (final Exception ex) {
- throw new RuntimeException(ex);
+ throw new ServiceException(ex);
}
return zkClient;
}
@@ -138,12 +145,12 @@ public class ZookeeperOperator implements
InitializingBean {
throw new IllegalStateException(ex);
} catch (Exception ex) {
logger.error("getChildrenKeys key : {}", key, ex);
- throw new RuntimeException(ex);
+ throw new ServiceException(ex);
}
}
- public boolean hasChildren(final String key){
- Stat stat ;
+ public boolean hasChildren(final String key) {
+ Stat stat;
try {
stat = zkClient.checkExists().forPath(key);
return stat.getNumChildren() >= 1;
@@ -241,4 +248,4 @@ public class ZookeeperOperator implements InitializingBean {
public void close() {
CloseableUtils.closeQuietly(zkClient);
}
-}
\ No newline at end of file
+}