This is an automated email from the ASF dual-hosted git repository.

caishunfeng pushed a commit to branch 3.1.0-prepare
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/3.1.0-prepare by this push:
     new 1a63f8672a Fix insert command error due to the id is not null (#12092) 
(#12098)
1a63f8672a is described below

commit 1a63f8672a9aa791cc50fea542ca7bba96e9cd4f
Author: Wenjun Ruan <[email protected]>
AuthorDate: Thu Sep 22 15:21:07 2022 +0800

    Fix insert command error due to the id is not null (#12092) (#12098)
    
    (cherry picked from commit fba5a8eaa0945f399733275341dfdcec11ab6b3d)
---
 .../api/service/impl/ExecutorServiceImpl.java      | 111 +++++++++++++++---
 .../service/impl/ProcessDefinitionServiceImpl.java | 126 +++++++++++++++++----
 .../service/process/ProcessServiceImpl.java        |   1 +
 3 files changed, 200 insertions(+), 38 deletions(-)

diff --git 
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java
 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java
index 174da3f10c..ec878c85a3 100644
--- 
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java
+++ 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java
@@ -198,8 +198,9 @@ public class ExecutorServiceImpl extends BaseServiceImpl 
implements ExecutorServ
         }
 
         if (!checkTenantSuitable(processDefinition)) {
-            logger.error("there is not any valid tenant for the process 
definition: id:{},name:{}, ",
-                    processDefinition.getId(), processDefinition.getName());
+            logger.error(
+                    "There is not any valid tenant for the process definition, 
processDefinitionCode:{}, processDefinitionName:{}.",
+                    processDefinition.getCode(), processDefinition.getName());
             putMsg(result, Status.TENANT_NOT_SUITABLE);
             return result;
         }
@@ -226,8 +227,12 @@ public class ExecutorServiceImpl extends BaseServiceImpl 
implements ExecutorServ
         if (create > 0) {
             processDefinition.setWarningGroupId(warningGroupId);
             processDefinitionMapper.updateById(processDefinition);
+            logger.info("Create command complete, processDefinitionCode:{}, 
commandCount:{}.",
+                    processDefinition.getCode(), create);
             putMsg(result, Status.SUCCESS);
         } else {
+            logger.error("Start process instance failed because create command 
error, processDefinitionCode:{}.",
+                    processDefinition.getCode());
             putMsg(result, Status.START_PROCESS_INSTANCE_ERROR);
         }
         return result;
@@ -288,12 +293,18 @@ public class ExecutorServiceImpl extends BaseServiceImpl 
implements ExecutorServ
         Map<String, Object> result = new HashMap<>();
         if (processDefinition == null || projectCode != 
processDefinition.getProjectCode()) {
             // check process definition exists
+            logger.error("Process definition does not exist, projectCode:{}, 
processDefinitionCode:{}.", projectCode,
+                    processDefineCode);
             putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, 
String.valueOf(processDefineCode));
         } else if (processDefinition.getReleaseState() != ReleaseState.ONLINE) 
{
             // check process definition online
+            logger.warn("Process definition is not {}, 
processDefinitionCode:{}, version:{}.",
+                    ReleaseState.ONLINE.getDescp(), processDefineCode, 
version);
             putMsg(result, Status.PROCESS_DEFINE_NOT_RELEASE, 
String.valueOf(processDefineCode), version);
         } else if (!checkSubProcessDefinitionValid(processDefinition)) {
             // check sub process definition online
+            logger.warn("Subprocess definition of process definition is not 
{}, processDefinitionCode:{}.",
+                    ReleaseState.ONLINE.getDescp(), processDefineCode);
             putMsg(result, Status.SUB_PROCESS_DEFINE_NOT_RELEASE);
         } else {
             result.put(Constants.STATUS, Status.SUCCESS);
@@ -391,7 +402,8 @@ public class ExecutorServiceImpl extends BaseServiceImpl 
implements ExecutorServ
             return result;
         }
         if (!checkTenantSuitable(processDefinition)) {
-            logger.error("there is not any valid tenant for the process 
definition: id:{},name:{}, ",
+            logger.error(
+                    "There is not any valid tenant for the process definition, 
processDefinitionId:{}, processDefinitionCode:{}, ",
                     processDefinition.getId(), processDefinition.getName());
             putMsg(result, Status.TENANT_NOT_SUITABLE);
         }
@@ -433,6 +445,8 @@ public class ExecutorServiceImpl extends BaseServiceImpl 
implements ExecutorServ
                 break;
             case PAUSE:
                 if (processInstance.getState() == 
WorkflowExecutionStatus.READY_PAUSE) {
+                    logger.warn("Process instance status is already {}, 
processInstanceName:{}.",
+                            WorkflowExecutionStatus.READY_STOP.getDesc(), 
processInstance.getName());
                     putMsg(result, Status.PROCESS_INSTANCE_ALREADY_CHANGED, 
processInstance.getName(),
                             processInstance.getState());
                 } else {
@@ -441,7 +455,8 @@ public class ExecutorServiceImpl extends BaseServiceImpl 
implements ExecutorServ
                 }
                 break;
             default:
-                logger.error("unknown execute type : {}", executeType);
+                logger.warn("Unknown execute type for process instance, 
processInstanceId:{}.",
+                        processInstance.getId());
                 putMsg(result, Status.REQUEST_PARAMS_NOT_VALID_ERROR, "unknown 
execute type");
 
                 break;
@@ -456,6 +471,8 @@ public class ExecutorServiceImpl extends BaseServiceImpl 
implements ExecutorServ
         // check process instance exist
         ProcessInstance processInstance = 
processInstanceMapper.selectById(taskGroupQueue.getProcessId());
         if (processInstance == null) {
+            logger.error("Process instance does not exist, projectCode:{}, 
processInstanceId:{}.",
+                    taskGroupQueue.getProjectCode(), 
taskGroupQueue.getProcessId());
             putMsg(result, Status.PROCESS_INSTANCE_NOT_EXIST, 
taskGroupQueue.getProcessId());
             return result;
         }
@@ -548,6 +565,8 @@ public class ExecutorServiceImpl extends BaseServiceImpl 
implements ExecutorServ
 
         // determine whether the process is normal
         if (update > 0) {
+            logger.info("Process instance state is updated to {} in database, 
processInstanceName:{}.",
+                    executionStatus.getDesc(), processInstance.getName());
             // directly send the process instance state change event to target 
master, not guarantee the event send
             // success
             WorkflowStateEventChangeCommand workflowStateEventChangeCommand = 
new WorkflowStateEventChangeCommand(
@@ -612,6 +631,9 @@ public class ExecutorServiceImpl extends BaseServiceImpl 
implements ExecutorServ
         command.setProcessInstanceId(instanceId);
 
         if (!processService.verifyIsNeedCreateCommand(command)) {
+            logger.warn(
+                    "Process instance is executing the command, 
processDefinitionCode:{}, processDefinitionVersion:{}, processInstanceId:{}.",
+                    processDefinitionCode, processVersion, instanceId);
             putMsg(result, Status.PROCESS_INSTANCE_EXECUTING_COMMAND, 
String.valueOf(processDefinitionCode));
             return result;
         }
@@ -621,6 +643,10 @@ public class ExecutorServiceImpl extends BaseServiceImpl 
implements ExecutorServ
         if (create > 0) {
             putMsg(result, Status.SUCCESS);
         } else {
+            logger.error(
+                    "Execute process instance failed because create {} command 
error, processDefinitionCode:{}, processDefinitionVersion:{}, 
processInstanceId:{}.",
+                    command.getCommandType().getDescp(), 
command.getProcessDefinitionCode(), processVersion,
+                    instanceId);
             putMsg(result, Status.EXECUTE_PROCESS_INSTANCE_ERROR);
         }
 
@@ -655,6 +681,9 @@ public class ExecutorServiceImpl extends BaseServiceImpl 
implements ExecutorServ
                      * if there is no online process, exit directly
                      */
                     if (processDefinitionTmp.getReleaseState() != 
ReleaseState.ONLINE) {
+                        logger.warn("Subprocess definition {} of process 
definition {} is not {}.",
+                                processDefinitionTmp.getName(),
+                                processDefinition.getName(), 
ReleaseState.ONLINE.getDescp());
                         putMsg(result, Status.PROCESS_DEFINE_NOT_RELEASE, 
processDefinitionTmp.getName());
                         logger.info("not release process definition id: {} , 
name : {}", processDefinitionTmp.getId(),
                                 processDefinitionTmp.getName());
@@ -736,12 +765,16 @@ public class ExecutorServiceImpl extends BaseServiceImpl 
implements ExecutorServ
         // determine whether to complement
         if (commandType == CommandType.COMPLEMENT_DATA) {
             if (schedule == null || StringUtils.isEmpty(schedule)) {
+                logger.error("Create {} type command error because parameter 
schedule is invalid.",
+                        command.getCommandType().getDescp());
                 return 0;
             }
             if (!isValidateScheduleTime(schedule)) {
                 return 0;
             }
             try {
+                logger.info("Start to create {} command, 
processDefinitionCode:{}.",
+                        command.getCommandType().getDescp(), 
processDefineCode);
                 return createComplementCommandList(schedule, runMode, command, 
expectedParallelismNumber,
                         complementDependentMode);
             } catch (CronParseException cronParseException) {
@@ -785,44 +818,65 @@ public class ExecutorServiceImpl extends BaseServiceImpl 
implements ExecutorServ
         }
         switch (runMode) {
             case RUN_MODE_SERIAL: {
+                logger.info("RunMode of {} command is serial run, 
processDefinitionCode:{}.",
+                        command.getCommandType().getDescp(), 
command.getProcessDefinitionCode());
                 if (StringUtils.isNotEmpty(dateList)) {
                     cmdParam.put(CMDPARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST, 
dateList);
                     command.setCommandParam(JSONUtils.toJsonString(cmdParam));
                     createCount = processService.createCommand(command);
+                    if (createCount > 0)
+                        logger.info("Create {} command complete, 
processDefinitionCode:{}",
+                                command.getCommandType().getDescp(), 
command.getProcessDefinitionCode());
+                    else
+                        logger.error("Create {} command error, 
processDefinitionCode:{}",
+                                command.getCommandType().getDescp(), 
command.getProcessDefinitionCode());
                 }
                 if (startDate != null && endDate != null) {
                     cmdParam.put(CMDPARAM_COMPLEMENT_DATA_START_DATE, 
startDate);
                     cmdParam.put(CMDPARAM_COMPLEMENT_DATA_END_DATE, endDate);
                     command.setCommandParam(JSONUtils.toJsonString(cmdParam));
                     createCount = processService.createCommand(command);
-
+                    if (createCount > 0)
+                        logger.info("Create {} command complete, 
processDefinitionCode:{}",
+                                command.getCommandType().getDescp(), 
command.getProcessDefinitionCode());
+                    else
+                        logger.error("Create {} command error, 
processDefinitionCode:{}",
+                                command.getCommandType().getDescp(), 
command.getProcessDefinitionCode());
                     // dependent process definition
                     List<Schedule> schedules = 
processService.queryReleaseSchedulerListByProcessDefinitionCode(
                             command.getProcessDefinitionCode());
 
                     if (schedules.isEmpty() || complementDependentMode == 
ComplementDependentMode.OFF_MODE) {
-                        logger.info("process code: {} complement dependent in 
off mode or schedule's size is 0, skip "
-                                + "dependent complement data", 
command.getProcessDefinitionCode());
+                        logger.info(
+                                "Complement dependent mode is off mode or 
Scheduler is empty, so skip create complement dependent command, 
processDefinitionCode:{}.",
+                                command.getProcessDefinitionCode());
                     } else {
+                        logger.info(
+                                "Complement dependent mode is all dependent 
and Scheduler is not empty, need create complement dependent command, 
processDefinitionCode:{}.",
+                                command.getProcessDefinitionCode());
                         dependentProcessDefinitionCreateCount += 
createComplementDependentCommand(schedules, command);
                     }
                 }
                 break;
             }
             case RUN_MODE_PARALLEL: {
+                logger.info("RunMode of {} command is parallel run, 
processDefinitionCode:{}.",
+                        command.getCommandType().getDescp(), 
command.getProcessDefinitionCode());
                 if (startDate != null && endDate != null) {
                     List<Schedule> schedules = 
processService.queryReleaseSchedulerListByProcessDefinitionCode(
                             command.getProcessDefinitionCode());
-                    List<ZonedDateTime> listDate = new ArrayList<>(
-                            
CronUtils.getSelfFireDateList(DateUtils.stringToZoneDateTime(startDate),
-                                    DateUtils.stringToZoneDateTime(endDate), 
schedules));
+                    List<ZonedDateTime> listDate = 
CronUtils.getSelfFireDateList(
+                            DateUtils.stringToZoneDateTime(startDate),
+                            DateUtils.stringToZoneDateTime(endDate),
+                            schedules);
                     int listDateSize = listDate.size();
                     createCount = listDate.size();
                     if (!CollectionUtils.isEmpty(listDate)) {
                         if (expectedParallelismNumber != null && 
expectedParallelismNumber != 0) {
                             createCount = Math.min(createCount, 
expectedParallelismNumber);
                         }
-                        logger.info("In parallel mode, current 
expectedParallelismNumber:{}", createCount);
+                        logger.info("Complement command run in parallel mode, 
current expectedParallelismNumber:{}.",
+                                createCount);
 
                         // Distribute the number of tasks equally to each 
command.
                         // The last command with insufficient quantity will be 
assigned to the remaining tasks.
@@ -847,14 +901,21 @@ public class ExecutorServiceImpl extends BaseServiceImpl 
implements ExecutorServ
                             cmdParam.put(CMDPARAM_COMPLEMENT_DATA_END_DATE,
                                     
DateUtils.dateToString(listDate.get(endDateIndex)));
                             
command.setCommandParam(JSONUtils.toJsonString(cmdParam));
-                            processService.createCommand(command);
-
+                            logger.info("Creating command, commandInfo:{}.", 
command);
+                            if (processService.createCommand(command) > 0)
+                                logger.info("Create {} command complete, 
processDefinitionCode:{}",
+                                        command.getCommandType().getDescp(), 
command.getProcessDefinitionCode());
+                            else
+                                logger.error("Create {} command error, 
processDefinitionCode:{}",
+                                        command.getCommandType().getDescp(), 
command.getProcessDefinitionCode());
                             if (schedules.isEmpty() || complementDependentMode 
== ComplementDependentMode.OFF_MODE) {
                                 logger.info(
-                                        "process code: {} complement dependent 
in off mode or schedule's size is 0, skip "
-                                                + "dependent complement data",
+                                        "Complement dependent mode is off mode 
or Scheduler is empty, so skip create complement dependent command, 
processDefinitionCode:{}.",
                                         command.getProcessDefinitionCode());
                             } else {
+                                logger.info(
+                                        "Complement dependent mode is all 
dependent and Scheduler is not empty, need create complement dependent command, 
processDefinitionCode:{}.",
+                                        command.getProcessDefinitionCode());
                                 dependentProcessDefinitionCreateCount +=
                                         
createComplementDependentCommand(schedules, command);
                             }
@@ -868,11 +929,18 @@ public class ExecutorServiceImpl extends BaseServiceImpl 
implements ExecutorServ
                         if (expectedParallelismNumber != null && 
expectedParallelismNumber != 0) {
                             createCount = Math.min(createCount, 
expectedParallelismNumber);
                         }
-                        logger.info("In parallel mode, current 
expectedParallelismNumber:{}", createCount);
+                        logger.info("Complement command run in parallel mode, 
current expectedParallelismNumber:{}.",
+                                createCount);
                         for (List<String> stringDate : 
Lists.partition(listDate, createCount)) {
                             
cmdParam.put(CMDPARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST, String.join(COMMA, 
stringDate));
                             
command.setCommandParam(JSONUtils.toJsonString(cmdParam));
-                            processService.createCommand(command);
+                            logger.info("Creating command, commandInfo:{}.", 
command);
+                            if (processService.createCommand(command) > 0)
+                                logger.info("Create {} command complete, 
processDefinitionCode:{}",
+                                        command.getCommandType().getDescp(), 
command.getProcessDefinitionCode());
+                            else
+                                logger.error("Create {} command error, 
processDefinitionCode:{}",
+                                        command.getCommandType().getDescp(), 
command.getProcessDefinitionCode());
                         }
                     }
                 }
@@ -993,7 +1061,9 @@ public class ExecutorServiceImpl extends BaseServiceImpl 
implements ExecutorServ
                     return false;
                 }
                 if (start.isAfter(end)) {
-                    logger.error("complement data error, wrong date start:{} 
and end date:{} ", start, end);
+                    logger.error(
+                            "Complement data parameter error, start time 
should be before end time, startDate:{}, endDate:{}.",
+                            start, end);
                     return false;
                 }
             } catch (Exception ex) {
@@ -1034,6 +1104,8 @@ public class ExecutorServiceImpl extends BaseServiceImpl 
implements ExecutorServ
         org.apache.dolphinscheduler.remote.command.Command command =
                 stateEventCallbackService.sendSync(host, 
requestCommand.convert2Command());
         if (command == null) {
+            logger.error("Query executing process instance from master error, 
processInstanceId:{}.",
+                    processInstanceId);
             return null;
         }
         WorkflowExecutingDataResponseCommand responseCommand =
@@ -1080,6 +1152,9 @@ public class ExecutorServiceImpl extends BaseServiceImpl 
implements ExecutorServ
         if (response != null) {
             putMsg(result, Status.SUCCESS);
         } else {
+            logger.error(
+                    "Start to execute stream task instance error, 
projectCode:{}, taskDefinitionCode:{}, taskVersion:{}.",
+                    projectCode, taskDefinitionCode, taskDefinitionVersion);
             putMsg(result, Status.START_TASK_INSTANCE_ERROR);
         }
         return result;
diff --git 
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
index d2e957bd58..e753fce125 100644
--- 
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
+++ 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
@@ -303,7 +303,9 @@ public class ProcessDefinitionServiceImpl extends 
BaseServiceImpl implements Pro
         int insertVersion = processService.saveProcessDefine(loginUser, 
processDefinition, Boolean.TRUE, Boolean.TRUE);
         if (insertVersion == 0) {
             throw new ServiceException(Status.CREATE_PROCESS_DEFINITION_ERROR);
-        }
+        } else
+            logger.info("Save process definition complete, processCode:{}, 
processVersion:{}.",
+                    processDefinition.getCode(), insertVersion);
         int insertResult = processService.saveTaskRelation(loginUser, 
processDefinition.getProjectCode(),
                 processDefinition.getCode(),
                 insertVersion, taskRelationList, taskDefinitionLogs, 
Boolean.TRUE);
@@ -679,6 +681,8 @@ public class ProcessDefinitionServiceImpl extends 
BaseServiceImpl implements Pro
             logger.info("The task has not changed, so skip");
         }
         if (saveTaskResult == Constants.DEFINITION_FAILURE) {
+            logger.error("Update task definitions error, projectCode:{}, 
processCode:{}.",
+                    processDefinition.getProjectCode(), 
processDefinition.getCode());
             putMsg(result, Status.UPDATE_TASK_DEFINITION_ERROR);
             throw new ServiceException(Status.UPDATE_TASK_DEFINITION_ERROR);
         }
@@ -710,12 +714,17 @@ public class ProcessDefinitionServiceImpl extends 
BaseServiceImpl implements Pro
             if (insertVersion <= 0) {
                 putMsg(result, Status.UPDATE_PROCESS_DEFINITION_ERROR);
                 throw new 
ServiceException(Status.UPDATE_PROCESS_DEFINITION_ERROR);
-            }
+            } else
+                logger.info("Update process definition complete, 
processCode:{}, processVersion:{}.",
+                        processDefinition.getCode(), insertVersion);
 
             taskUsedInOtherTaskValid(processDefinition, taskRelationList);
             int insertResult = processService.saveTaskRelation(loginUser, 
processDefinition.getProjectCode(),
                     processDefinition.getCode(), insertVersion, 
taskRelationList, taskDefinitionLogs, Boolean.TRUE);
             if (insertResult == Constants.EXIT_CODE_SUCCESS) {
+                logger.info(
+                        "Update process task relations complete, 
projectCode:{}, processCode:{}, processVersion:{}.",
+                        processDefinition.getProjectCode(), 
processDefinition.getCode(), insertVersion);
                 putMsg(result, Status.SUCCESS);
                 result.put(Constants.DATA_LIST, processDefinition);
             } else {
@@ -724,6 +733,9 @@ public class ProcessDefinitionServiceImpl extends 
BaseServiceImpl implements Pro
             }
             saveOtherRelation(loginUser, processDefinition, result, 
otherParamsJson);
         } else {
+            logger.info(
+                    "Process definition does not need to be updated because 
there is no change, projectCode:{}, processCode:{}, processVersion:{}.",
+                    processDefinition.getProjectCode(), 
processDefinition.getCode(), processDefinition.getVersion());
             putMsg(result, Status.SUCCESS);
             result.put(Constants.DATA_LIST, processDefinition);
         }
@@ -779,6 +791,9 @@ public class ProcessDefinitionServiceImpl extends 
BaseServiceImpl implements Pro
         List<ProcessInstance> processInstances = processInstanceService
                 
.queryByProcessDefineCodeAndStatus(processDefinition.getCode(), 
Constants.NOT_TERMINATED_STATES);
         if (CollectionUtils.isNotEmpty(processInstances)) {
+            logger.warn(
+                    "Process definition can not be deleted because there are 
{} executing process instances, processDefinitionCode:{}",
+                    processInstances.size(), processDefinition.getCode());
             throw new 
ServiceException(Status.DELETE_PROCESS_DEFINITION_EXECUTING_FAIL, 
processInstances.size());
         }
 
@@ -790,6 +805,9 @@ public class ProcessDefinitionServiceImpl extends 
BaseServiceImpl implements Pro
                     .map(task -> String.format(Constants.FORMAT_S_S_COLON, 
task.getProcessDefinitionName(),
                             task.getTaskName()))
                     .collect(Collectors.joining(Constants.COMMA));
+            logger.warn(
+                    "Process definition can not be deleted due to being 
referenced by other tasks:{}, processDefinitionCode:{}",
+                    taskDepDetail, processDefinition.getCode());
             throw new 
ServiceException(Status.DELETE_PROCESS_DEFINITION_USE_BY_OTHER_FAIL, 
taskDepDetail);
         }
     }
@@ -820,6 +838,8 @@ public class ProcessDefinitionServiceImpl extends 
BaseServiceImpl implements Pro
 
         // Determine if the login user is the owner of the process definition
         if (loginUser.getId() != processDefinition.getUserId() && 
loginUser.getUserType() != UserType.ADMIN_USER) {
+            logger.warn("User does not have permission for process definition, 
userId:{}, processDefinitionCode:{}.",
+                    loginUser.getId(), code);
             putMsg(result, Status.USER_NO_OPERATION_PERM);
             return result;
         }
@@ -832,11 +852,17 @@ public class ProcessDefinitionServiceImpl extends 
BaseServiceImpl implements Pro
             if (scheduleObj.getReleaseState() == ReleaseState.OFFLINE) {
                 int delete = scheduleMapper.deleteById(scheduleObj.getId());
                 if (delete == 0) {
+                    logger.error(
+                            "Delete schedule of process definition error, 
processDefinitionCode:{}, scheduleId:{}.",
+                            code, scheduleObj.getId());
                     putMsg(result, Status.DELETE_SCHEDULE_CRON_BY_ID_ERROR);
                     throw new 
ServiceException(Status.DELETE_SCHEDULE_CRON_BY_ID_ERROR);
                 }
             }
             if (scheduleObj.getReleaseState() == ReleaseState.ONLINE) {
+                logger.warn(
+                        "Process definition can not be deleted due to schedule 
{}, processDefinitionCode:{}, scheduleId:{}.",
+                        ReleaseState.ONLINE.getDescp(), 
processDefinition.getCode(), scheduleObj.getId());
                 putMsg(result, Status.SCHEDULE_CRON_STATE_ONLINE, 
scheduleObj.getId());
                 return result;
             }
@@ -848,7 +874,9 @@ public class ProcessDefinitionServiceImpl extends 
BaseServiceImpl implements Pro
         }
         int deleteRelation = 
processTaskRelationMapper.deleteByCode(project.getCode(), 
processDefinition.getCode());
         if (deleteRelation == 0) {
-            logger.warn("The process definition has not relation, it will be 
delete successfully");
+            logger.warn(
+                    "The process definition has not relation, it will be 
delete successfully, processDefinitionCode:{}.",
+                    code);
         }
         deleteOtherRelation(project, result, processDefinition);
         putMsg(result, Status.SUCCESS);
@@ -897,22 +925,31 @@ public class ProcessDefinitionServiceImpl extends 
BaseServiceImpl implements Pro
                 }
                 processDefinition.setReleaseState(releaseState);
                 processDefinitionMapper.updateById(processDefinition);
+                logger.info("Set process definition online, projectCode:{}, 
processDefinitionCode:{}.", projectCode,
+                        code);
                 break;
             case OFFLINE:
                 processDefinition.setReleaseState(releaseState);
                 int updateProcess = 
processDefinitionMapper.updateById(processDefinition);
                 Schedule schedule = 
scheduleMapper.queryByProcessDefinitionCode(code);
-                if (updateProcess > 0 && schedule != null) {
-                    logger.info("set schedule offline, project code: {}, 
schedule id: {}, process definition code: {}",
-                            projectCode, schedule.getId(), code);
-                    // set status
-                    schedule.setReleaseState(releaseState);
-                    int updateSchedule = scheduleMapper.updateById(schedule);
-                    if (updateSchedule == 0) {
-                        putMsg(result, Status.OFFLINE_SCHEDULE_ERROR);
-                        throw new 
ServiceException(Status.OFFLINE_SCHEDULE_ERROR);
+                if (updateProcess > 0) {
+                    logger.info("Set process definition offline, 
projectCode:{}, processDefinitionCode:{}.",
+                            projectCode, code);
+                    if (schedule != null) {
+                        // set status
+                        schedule.setReleaseState(releaseState);
+                        int updateSchedule = 
scheduleMapper.updateById(schedule);
+                        if (updateSchedule == 0) {
+                            logger.error(
+                                    "Set schedule offline error, 
projectCode:{}, processDefinitionCode:{}, scheduleId:{}",
+                                    projectCode, code, schedule.getId());
+                            putMsg(result, Status.OFFLINE_SCHEDULE_ERROR);
+                            throw new 
ServiceException(Status.OFFLINE_SCHEDULE_ERROR);
+                        } else
+                            logger.info("Set schedule offline, projectCode:{}, 
processDefinitionCode:{}, scheduleId:{}",
+                                    projectCode, code, schedule.getId());
+                        schedulerService.deleteSchedule(project.getId(), 
schedule.getId());
                     }
-                    schedulerService.deleteSchedule(project.getId(), 
schedule.getId());
                 }
                 break;
             default:
@@ -1272,6 +1309,9 @@ public class ProcessDefinitionServiceImpl extends 
BaseServiceImpl implements Pro
         try {
             
processDefinition.setCode(CodeGenerateUtils.getInstance().genCode());
         } catch (CodeGenerateException e) {
+            logger.error(
+                    "Save process definition error because generate process 
definition code error, projectCode:{}.",
+                    projectCode, e);
             putMsg(result, Status.CREATE_PROCESS_DEFINITION_ERROR);
             return false;
         }
@@ -1294,7 +1334,8 @@ public class ProcessDefinitionServiceImpl extends 
BaseServiceImpl implements Pro
                 taskCodeMap.put(taskDefinitionLog.getCode(), code);
                 taskDefinitionLog.setCode(code);
             } catch (CodeGenerateException e) {
-                logger.error("Task code get error, ", e);
+                logger.error("Generate task definition code error, 
projectCode:{}, processDefinitionCode:{}",
+                        projectCode, processDefinition.getCode(), e);
                 putMsg(result, Status.INTERNAL_SERVER_ERROR_ARGS, "Error 
generating task definition code");
                 return false;
             }
@@ -1303,6 +1344,8 @@ public class ProcessDefinitionServiceImpl extends 
BaseServiceImpl implements Pro
         int insert = taskDefinitionMapper.batchInsert(taskDefinitionLogList);
         int logInsert = 
taskDefinitionLogMapper.batchInsert(taskDefinitionLogList);
         if ((logInsert & insert) == 0) {
+            logger.error("Save task definition error, projectCode:{}, 
processDefinitionCode:{}", projectCode,
+                    processDefinition.getCode());
             putMsg(result, Status.CREATE_TASK_DEFINITION_ERROR);
             throw new ServiceException(Status.CREATE_TASK_DEFINITION_ERROR);
         }
@@ -1345,6 +1388,8 @@ public class ProcessDefinitionServiceImpl extends 
BaseServiceImpl implements Pro
             putMsg(createDagResult, Status.SUCCESS);
         } else {
             result.putAll(createDagResult);
+            logger.error("Import process definition error, projectCode:{}, 
processDefinitionCode:{}.", projectCode,
+                    processDefinition.getCode());
             throw new ServiceException(Status.IMPORT_PROCESS_DEFINE_ERROR);
         }
 
@@ -1357,10 +1402,16 @@ public class ProcessDefinitionServiceImpl extends 
BaseServiceImpl implements Pro
             schedule.setUpdateTime(now);
             int scheduleInsert = scheduleMapper.insert(schedule);
             if (0 == scheduleInsert) {
+                logger.error(
+                        "Import process definition error due to save schedule 
fail, projectCode:{}, processDefinitionCode:{}.",
+                        projectCode, processDefinition.getCode());
                 putMsg(result, Status.IMPORT_PROCESS_DEFINE_ERROR);
                 throw new ServiceException(Status.IMPORT_PROCESS_DEFINE_ERROR);
             }
         }
+
+        logger.info("Import process definition complete, projectCode:{}, 
processDefinitionCode:{}.", projectCode,
+                processDefinition.getCode());
         return true;
     }
 
@@ -1934,7 +1985,7 @@ public class ProcessDefinitionServiceImpl extends 
BaseServiceImpl implements Pro
                     putMsg(result, Status.INTERNAL_SERVER_ERROR_ARGS);
                     throw new 
ServiceException(Status.INTERNAL_SERVER_ERROR_ARGS);
                 }
-                processDefinition.setId(0);
+                processDefinition.setId(null);
                 processDefinition.setUserId(loginUser.getId());
                 
processDefinition.setName(getNewName(processDefinition.getName(), COPY_SUFFIX));
                 final Date date = new Date();
@@ -1967,6 +2018,8 @@ public class ProcessDefinitionServiceImpl extends 
BaseServiceImpl implements Pro
                     result.putAll(createDagDefine(loginUser, taskRelationList, 
processDefinition, taskDefinitionLogs,
                             otherParamsJson));
                 } catch (Exception e) {
+                    logger.error("Copy process definition error, 
processDefinitionCode from {} to {}.",
+                            oldProcessDefinitionCode, 
processDefinition.getCode(), e);
                     putMsg(result, Status.COPY_PROCESS_DEFINITION_ERROR);
                     throw new 
ServiceException(Status.COPY_PROCESS_DEFINITION_ERROR);
                 }
@@ -1975,6 +2028,8 @@ public class ProcessDefinitionServiceImpl extends 
BaseServiceImpl implements Pro
                     result.putAll(updateDagDefine(loginUser, taskRelationList, 
processDefinition, null,
                             Lists.newArrayList(), otherParamsJson));
                 } catch (Exception e) {
+                    logger.error("Move process definition error, 
processDefinitionCode:{}.",
+                            processDefinition.getCode(), e);
                     putMsg(result, Status.MOVE_PROCESS_DEFINITION_ERROR);
                     throw new 
ServiceException(Status.MOVE_PROCESS_DEFINITION_ERROR);
                 }
@@ -2030,6 +2085,9 @@ public class ProcessDefinitionServiceImpl extends 
BaseServiceImpl implements Pro
 
         ProcessDefinition processDefinition = 
processDefinitionMapper.queryByCode(code);
         if (Objects.isNull(processDefinition) || projectCode != 
processDefinition.getProjectCode()) {
+            logger.error(
+                    "Switch process definition error because it does not 
exist, projectCode:{}, processDefinitionCode:{}.",
+                    projectCode, code);
             putMsg(result, 
Status.SWITCH_PROCESS_DEFINITION_VERSION_NOT_EXIST_PROCESS_DEFINITION_ERROR, 
code);
             return result;
         }
@@ -2037,15 +2095,23 @@ public class ProcessDefinitionServiceImpl extends 
BaseServiceImpl implements Pro
         ProcessDefinitionLog processDefinitionLog =
                 
processDefinitionLogMapper.queryByDefinitionCodeAndVersion(code, version);
         if (Objects.isNull(processDefinitionLog)) {
+            logger.error(
+                    "Switch process definition error because version does not 
exist, projectCode:{}, processDefinitionCode:{}, version:{}.",
+                    projectCode, code, version);
             putMsg(result, 
Status.SWITCH_PROCESS_DEFINITION_VERSION_NOT_EXIST_PROCESS_DEFINITION_VERSION_ERROR,
                     processDefinition.getCode(), version);
             return result;
         }
         int switchVersion = processService.switchVersion(processDefinition, 
processDefinitionLog);
         if (switchVersion <= 0) {
+            logger.error(
+                    "Switch process definition version error, projectCode:{}, 
processDefinitionCode:{}, version:{}.",
+                    projectCode, code, version);
             putMsg(result, Status.SWITCH_PROCESS_DEFINITION_VERSION_ERROR);
             throw new 
ServiceException(Status.SWITCH_PROCESS_DEFINITION_VERSION_ERROR);
         }
+        logger.info("Switch process definition version complete, 
projectCode:{}, processDefinitionCode:{}, version:{}.",
+                projectCode, code, version);
         putMsg(result, Status.SUCCESS);
         return result;
     }
@@ -2062,14 +2128,21 @@ public class ProcessDefinitionServiceImpl extends 
BaseServiceImpl implements Pro
     private void checkBatchOperateResult(long srcProjectCode, long 
targetProjectCode,
                                          Map<String, Object> result, 
List<String> failedProcessList, boolean isCopy) {
         if (!failedProcessList.isEmpty()) {
+            String failedProcess = String.join(",", failedProcessList);
             if (isCopy) {
-                putMsg(result, Status.COPY_PROCESS_DEFINITION_ERROR, 
srcProjectCode, targetProjectCode,
-                        String.join(",", failedProcessList));
+                logger.error(
+                        "Copy process definition error, srcProjectCode:{}, 
targetProjectCode:{}, failedProcessList:{}.",
+                        srcProjectCode, targetProjectCode, failedProcess);
+                putMsg(result, Status.COPY_PROCESS_DEFINITION_ERROR, 
srcProjectCode, targetProjectCode, failedProcess);
             } else {
-                putMsg(result, Status.MOVE_PROCESS_DEFINITION_ERROR, 
srcProjectCode, targetProjectCode,
-                        String.join(",", failedProcessList));
+                logger.error(
+                        "Move process definition error, srcProjectCode:{}, 
targetProjectCode:{}, failedProcessList:{}.",
+                        srcProjectCode, targetProjectCode, failedProcess);
+                putMsg(result, Status.MOVE_PROCESS_DEFINITION_ERROR, 
srcProjectCode, targetProjectCode, failedProcess);
             }
         } else {
+            logger.info("Batch {} process definition complete, 
srcProjectCode:{}, targetProjectCode:{}.",
+                    isCopy ? "copy" : "move", srcProjectCode, 
targetProjectCode);
             putMsg(result, Status.SUCCESS);
         }
     }
@@ -2136,16 +2209,25 @@ public class ProcessDefinitionServiceImpl extends 
BaseServiceImpl implements Pro
             putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, 
String.valueOf(code));
         } else {
             if (processDefinition.getVersion() == version) {
+                logger.warn(
+                        "Process definition can not be deleted due to version 
is being used, projectCode:{}, processDefinitionCode:{}, version:{}.",
+                        projectCode, code, version);
                 putMsg(result, Status.MAIN_TABLE_USING_VERSION);
                 return result;
             }
             int deleteLog = 
processDefinitionLogMapper.deleteByProcessDefinitionCodeAndVersion(code, 
version);
             int deleteRelationLog = 
processTaskRelationLogMapper.deleteByCode(code, version);
             if (deleteLog == 0 || deleteRelationLog == 0) {
+                logger.error(
+                        "Delete process definition version error, 
projectCode:{}, processDefinitionCode:{}, version:{}.",
+                        projectCode, code, version);
                 putMsg(result, Status.DELETE_PROCESS_DEFINE_BY_CODE_ERROR);
                 throw new 
ServiceException(Status.DELETE_PROCESS_DEFINE_BY_CODE_ERROR);
             }
             deleteOtherRelation(project, result, processDefinition);
+            logger.info(
+                    "Delete process definition version complete, 
projectCode:{}, processDefinitionCode:{}, version:{}.",
+                    projectCode, code, version);
             putMsg(result, Status.SUCCESS);
         }
         return result;
@@ -2255,7 +2337,8 @@ public class ProcessDefinitionServiceImpl extends 
BaseServiceImpl implements Pro
         Date now = new Date();
         scheduleObj.setProcessDefinitionCode(processDefinition.getCode());
         if (DateUtils.differSec(scheduleObj.getStartTime(), 
scheduleObj.getEndTime()) == 0) {
-            logger.warn("The start time must not be the same as the end");
+            logger.warn("The schedule start time must not be the same as the 
end, processDefinitionCode:{}.",
+                    processDefinition.getCode());
             putMsg(result, Status.SCHEDULE_START_TIME_END_TIME_SAME);
             return result;
         }
@@ -2476,6 +2559,9 @@ public class ProcessDefinitionServiceImpl extends 
BaseServiceImpl implements Pro
                     scheduleObj.setReleaseState(ReleaseState.OFFLINE);
                     int updateSchedule = 
scheduleMapper.updateById(scheduleObj);
                     if (updateSchedule == 0) {
+                        logger.error(
+                                "Set schedule offline error, projectCode:{}, 
processDefinitionCode:{}, scheduleId:{}",
+                                projectCode, code, scheduleObj.getId());
                         putMsg(result, Status.OFFLINE_SCHEDULE_ERROR);
                         throw new 
ServiceException(Status.OFFLINE_SCHEDULE_ERROR);
                     }
diff --git 
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
 
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
index aa0398b209..0aad520c41 100644
--- 
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
+++ 
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
@@ -409,6 +409,7 @@ public class ProcessServiceImpl implements ProcessService {
                 commandParams.put(Constants.SCHEDULE_TIMEZONE, 
schedule.getTimezoneId());
                 command.setCommandParam(JSONUtils.toJsonString(commandParams));
             }
+            command.setId(null);
             result = commandMapper.insert(command);
         }
         return result;


Reply via email to