This is an automated email from the ASF dual-hosted git repository.
caishunfeng pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new 165b9a58de [DS-12131][master] Optimize the log printing of the master
module acc… (#12152)
165b9a58de is described below
commit 165b9a58de4a92c9206a0c8f8f1cec85a1f4c9e8
Author: sgw <[email protected]>
AuthorDate: Sun Oct 9 13:56:50 2022 +0800
[DS-12131][master] Optimize the log printing of the master module acc…
(#12152)
* [DS-12131][master] Optimize the log printing of the master module
according to the log specification.
---
.../master/consumer/TaskPriorityQueueConsumer.java | 4 ++
.../dispatch/executor/NettyExecutorManager.java | 4 +-
.../master/event/TaskRetryStateEventHandler.java | 6 ++
.../server/master/event/TaskStateEventHandler.java | 4 ++
.../master/event/TaskTimeoutStateEventHandler.java | 2 +
.../event/TaskWaitTaskGroupStateHandler.java | 6 ++
.../event/WorkflowBlockStateEventHandler.java | 1 +
.../event/WorkflowTimeoutStateEventHandler.java | 6 ++
.../master/runner/MasterSchedulerBootstrap.java | 2 +
.../master/runner/StateWheelExecuteThread.java | 6 +-
.../master/runner/WorkflowExecuteRunnable.java | 66 ++++++++++++----------
.../master/runner/WorkflowExecuteThreadPool.java | 19 +++++--
.../master/runner/task/BaseTaskProcessor.java | 9 ++-
.../master/runner/task/CommonTaskProcessor.java | 16 +++---
.../org/apache/dolphinscheduler/rpc/RpcTest.java | 11 ----
15 files changed, 97 insertions(+), 65 deletions(-)
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java
index 7bff4e9d21..f073fd55f3 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java
@@ -128,12 +128,14 @@ public class TaskPriorityQueueConsumer extends
BaseDaemonThread {
List<TaskPriority> failedDispatchTasks =
this.batchDispatch(fetchTaskNum);
if (CollectionUtils.isNotEmpty(failedDispatchTasks)) {
+ logger.info("{} tasks dispatch failed, will retry to
dispatch", failedDispatchTasks.size());
TaskMetrics.incTaskDispatchFailed(failedDispatchTasks.size());
for (TaskPriority dispatchFailedTask :
failedDispatchTasks) {
taskPriorityQueue.put(dispatchFailedTask);
}
// If the all task dispatch failed, will sleep for 1s to
avoid the master cpu higher.
if (fetchTaskNum == failedDispatchTasks.size()) {
+ logger.info("All tasks dispatch failed, will sleep a
while to avoid the master cpu higher");
TimeUnit.MILLISECONDS.sleep(Constants.SLEEP_TIME_MILLIS);
}
}
@@ -209,6 +211,8 @@ public class TaskPriorityQueueConsumer extends
BaseDaemonThread {
if (isTaskNeedToCheck(taskPriority)) {
if (taskInstanceIsFinalState(taskPriority.getTaskId())) {
// when task finish, ignore this task, there is no need to
dispatch anymore
+ logger.info("Task {} is already finished, no need to
dispatch, task instance id: {}",
+ taskInstance.getName(), taskInstance.getId());
return true;
}
}
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManager.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManager.java
index 25b8037dd6..62da82156f 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManager.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManager.java
@@ -114,7 +114,7 @@ public class NettyExecutorManager extends
AbstractExecutorManager<Boolean> {
// is not belongs to the down worker ISSUE-10842.
context.getTaskInstance().setHost(host.getAddress());
} catch (ExecuteException ex) {
- logger.error(String.format("execute command : %s error",
command), ex);
+ logger.error("Execute command {} error", command, ex);
try {
failNodeSet.add(host.getAddress());
Set<String> tmpAllIps = new HashSet<>(allNodes);
@@ -156,7 +156,7 @@ public class NettyExecutorManager extends
AbstractExecutorManager<Boolean> {
nettyRemotingClient.send(host, command);
success = true;
} catch (Exception ex) {
- logger.error(String.format("send command : %s to %s error",
command, host), ex);
+ logger.error("Send command to {} error, command: {}", host,
command, ex);
retryCount--;
ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS);
}
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskRetryStateEventHandler.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskRetryStateEventHandler.java
index 1ac198497f..1d5f7a3883 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskRetryStateEventHandler.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskRetryStateEventHandler.java
@@ -22,17 +22,23 @@ import
org.apache.dolphinscheduler.common.enums.StateEventType;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.server.master.metrics.TaskMetrics;
import
org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.util.Map;
@AutoService(StateEventHandler.class)
public class TaskRetryStateEventHandler implements StateEventHandler {
+ private static final Logger logger =
LoggerFactory.getLogger(TaskRetryStateEventHandler.class);
+
@Override
public boolean handleStateEvent(WorkflowExecuteRunnable
workflowExecuteRunnable,
StateEvent stateEvent) throws
StateEventHandleException {
TaskStateEvent taskStateEvent = (TaskStateEvent) stateEvent;
+ logger.info("Handle task instance retry event, taskInstanceId: {}",
taskStateEvent.getTaskInstanceId());
+
TaskMetrics.incTaskInstanceByState("retry");
Map<Long, TaskInstance> waitToRetryTaskInstanceMap =
workflowExecuteRunnable.getWaitToRetryTaskInstanceMap();
TaskInstance taskInstance =
waitToRetryTaskInstanceMap.get(taskStateEvent.getTaskCode());
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskStateEventHandler.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskStateEventHandler.java
index aab5d2d6a2..935fa9db37 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskStateEventHandler.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskStateEventHandler.java
@@ -54,6 +54,10 @@ public class TaskStateEventHandler implements
StateEventHandler {
throw new StateEventHandleError("Task state event handle error due
to task state is null");
}
+ logger.info(
+ "Handle task instance state event, the current task instance
state {} will be changed to {}",
+ task.getState(), taskStateEvent.getStatus());
+
Map<Long, Integer> completeTaskMap =
workflowExecuteRunnable.getCompleteTaskMap();
if (task.getState().isFinished()) {
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskTimeoutStateEventHandler.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskTimeoutStateEventHandler.java
index 1678594323..9efe773b19 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskTimeoutStateEventHandler.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskTimeoutStateEventHandler.java
@@ -50,6 +50,8 @@ public class TaskTimeoutStateEventHandler implements
StateEventHandler {
"Cannot find the task instance from workflow execute
runnable, taskInstanceId: %s",
taskStateEvent.getTaskInstanceId())));
+ logger.info("Handle task instance state timout event, taskInstanceId:
{}", taskStateEvent.getTaskInstanceId());
+
if (TimeoutFlag.CLOSE ==
taskInstance.getTaskDefine().getTimeoutFlag()) {
return true;
}
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskWaitTaskGroupStateHandler.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskWaitTaskGroupStateHandler.java
index 9a3c59a949..8dcca91d89 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskWaitTaskGroupStateHandler.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskWaitTaskGroupStateHandler.java
@@ -21,11 +21,17 @@ import
org.apache.dolphinscheduler.common.enums.StateEventType;
import
org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable;
import com.google.auto.service.AutoService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
@AutoService(StateEventHandler.class)
public class TaskWaitTaskGroupStateHandler implements StateEventHandler {
+
+ private static final Logger logger =
LoggerFactory.getLogger(TaskWaitTaskGroupStateHandler.class);
+
@Override
public boolean handleStateEvent(WorkflowExecuteRunnable
workflowExecuteRunnable, StateEvent stateEvent) {
+ logger.info("Handle task instance wait task group event,
taskInstanceId: {}", stateEvent.getTaskInstanceId());
return workflowExecuteRunnable.checkForceStartAndWakeUp(stateEvent);
}
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/WorkflowBlockStateEventHandler.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/WorkflowBlockStateEventHandler.java
index f7349fcbd1..cac30a8072 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/WorkflowBlockStateEventHandler.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/WorkflowBlockStateEventHandler.java
@@ -38,6 +38,7 @@ public class WorkflowBlockStateEventHandler implements
StateEventHandler {
@Override
public boolean handleStateEvent(WorkflowExecuteRunnable
workflowExecuteRunnable, StateEvent stateEvent)
throws StateEventHandleError {
+ logger.info("Handle workflow instance state block event");
Optional<TaskInstance> taskInstanceOptional =
workflowExecuteRunnable.getTaskInstance(stateEvent.getTaskInstanceId());
if (!taskInstanceOptional.isPresent()) {
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/WorkflowTimeoutStateEventHandler.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/WorkflowTimeoutStateEventHandler.java
index b04866a76a..4a9fd99392 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/WorkflowTimeoutStateEventHandler.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/WorkflowTimeoutStateEventHandler.java
@@ -22,11 +22,17 @@ import
org.apache.dolphinscheduler.server.master.metrics.ProcessInstanceMetrics;
import
org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable;
import com.google.auto.service.AutoService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
@AutoService(StateEventHandler.class)
public class WorkflowTimeoutStateEventHandler implements StateEventHandler {
+
+ private static final Logger logger =
LoggerFactory.getLogger(WorkflowTimeoutStateEventHandler.class);
+
@Override
public boolean handleStateEvent(WorkflowExecuteRunnable
workflowExecuteRunnable, StateEvent stateEvent) {
+ logger.info("Handle workflow instance timeout event");
ProcessInstanceMetrics.incProcessInstanceByState("timeout");
workflowExecuteRunnable.processTimeout();
return true;
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerBootstrap.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerBootstrap.java
index 6a3a12e1e4..351ebe7c17 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerBootstrap.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerBootstrap.java
@@ -136,12 +136,14 @@ public class MasterSchedulerBootstrap extends
BaseDaemonThread implements AutoCl
try {
if (!ServerLifeCycleManager.isRunning()) {
// the current server is not at running status, cannot
consume command.
+ logger.warn("The current server {} is not at running
status, cannot consumes commands.", this.masterAddress);
Thread.sleep(Constants.SLEEP_TIME_MILLIS);
}
// todo: if the workflow event queue is much, we need to
handle the back pressure
boolean isOverload =
OSUtils.isOverload(masterConfig.getMaxCpuLoadAvg(),
masterConfig.getReservedMemory());
if (isOverload) {
+ logger.warn("The current server {} is overload, cannot
consumes commands.", this.masterAddress);
MasterServerMetrics.incMasterOverload();
Thread.sleep(Constants.SLEEP_TIME_MILLIS);
continue;
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java
index 2a19b1ca8d..8a4b195a5b 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java
@@ -120,13 +120,13 @@ public class StateWheelExecuteThread extends
BaseDaemonThread {
public void addProcess4TimeoutCheck(ProcessInstance processInstance) {
processInstanceTimeoutCheckList.add(processInstance.getId());
- logger.info("Success add workflow instance into timeout check list");
+ logger.info("Success add workflow instance {} into timeout check
list", processInstance.getId());
}
public void removeProcess4TimeoutCheck(int processInstanceId) {
boolean removeFlag =
processInstanceTimeoutCheckList.remove(processInstanceId);
if (removeFlag) {
- logger.info("Success remove workflow instance from timeout check
list");
+ logger.info("Success remove workflow instance {} from timeout
check list", processInstanceId);
}
}
@@ -154,7 +154,7 @@ public class StateWheelExecuteThread extends
BaseDaemonThread {
(long) processInstance.getTimeout()
* Constants.SEC_2_MINUTES_TIME_UNIT);
if (timeRemain < 0) {
- logger.info("Workflow instance timeout, adding timeout
event");
+ logger.info("Workflow instance {} timeout, adding timeout
event", processInstance.getId());
addProcessTimeoutEvent(processInstance);
processInstanceTimeoutCheckList.remove(processInstance.getId());
logger.info("Workflow instance timeout, added timeout
event");
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
index 7716314444..14b88b773b 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
@@ -457,7 +457,7 @@ public class WorkflowExecuteRunnable implements
Callable<WorkflowSubmitStatue> {
}
TaskInstance newTaskInstance = cloneRetryTaskInstance(taskInstance);
if (newTaskInstance == null) {
- logger.error("retry fail, new taskInstance is null, task code:{},
task id:{}",
+ logger.error("Retry task fail because new taskInstance is null,
task code:{}, task id:{}",
taskInstance.getTaskCode(),
taskInstance.getId());
return;
@@ -465,7 +465,7 @@ public class WorkflowExecuteRunnable implements
Callable<WorkflowSubmitStatue> {
waitToRetryTaskInstanceMap.put(newTaskInstance.getTaskCode(),
newTaskInstance);
if (!taskInstance.retryTaskIntervalOverTime()) {
logger.info(
- "failure task will be submitted: process id: {}, task
instance code: {} state:{} retry times:{} / {}, interval:{}",
+ "Failure task will be submitted, process id: {}, task
instance code: {}, state: {}, retry times: {} / {}, interval: {}",
processInstance.getId(), newTaskInstance.getTaskCode(),
newTaskInstance.getState(),
newTaskInstance.getRetryTimes(), newTaskInstance.getMaxRetryTimes(),
newTaskInstance.getRetryInterval());
@@ -789,7 +789,7 @@ public class WorkflowExecuteRunnable implements
Callable<WorkflowSubmitStatue> {
ProcessDag processDag = generateFlowDag(taskNodeList,
startNodeNameList, recoveryNodeCodeList,
processInstance.getTaskDependType());
if (processDag == null) {
- logger.error("processDag is null");
+ logger.error("ProcessDag is null");
return;
}
// generate process dag
@@ -822,6 +822,8 @@ public class WorkflowExecuteRunnable implements
Callable<WorkflowSubmitStatue> {
task.getTaskCode(),
task.getState());
if (validTaskMap.containsKey(task.getTaskCode())) {
+ logger.warn("Have same taskCode taskInstance when init
task queue, need to check taskExecutionStatus, taskCode:{}",
+ task.getTaskCode());
int oldTaskInstanceId =
validTaskMap.get(task.getTaskCode());
TaskInstance oldTaskInstance =
taskInstanceMap.get(oldTaskInstanceId);
if (!oldTaskInstance.getState().isFinished() &&
task.getState().isFinished()) {
@@ -829,14 +831,13 @@ public class WorkflowExecuteRunnable implements
Callable<WorkflowSubmitStatue> {
processService.updateTaskInstance(task);
continue;
}
- logger.warn("have same taskCode taskInstance when init
task queue, taskCode:{}",
- task.getTaskCode());
}
validTaskMap.put(task.getTaskCode(), task.getId());
taskInstanceMap.put(task.getId(), task);
if (task.isTaskComplete()) {
+ logger.info("TaskInstance is already complete.");
completeTaskMap.put(task.getTaskCode(), task.getId());
continue;
}
@@ -846,6 +847,7 @@ public class WorkflowExecuteRunnable implements
Callable<WorkflowSubmitStatue> {
}
if (task.taskCanRetry()) {
if (task.getState().isNeedFaultTolerance()) {
+ logger.info("TaskInstance needs fault tolerance,
will be added to standby list.");
task.setFlag(Flag.NO);
processService.updateTaskInstance(task);
@@ -853,6 +855,7 @@ public class WorkflowExecuteRunnable implements
Callable<WorkflowSubmitStatue> {
TaskInstance tolerantTaskInstance =
cloneTolerantTaskInstance(task);
addTaskToStandByList(tolerantTaskInstance);
} else {
+ logger.info("Retry taskInstance, taskState: {}",
task.getState());
retryTaskInstance(task);
}
continue;
@@ -934,15 +937,14 @@ public class WorkflowExecuteRunnable implements
Callable<WorkflowSubmitStatue> {
boolean submit = taskProcessor.action(TaskAction.SUBMIT);
if (!submit) {
- logger.error("process id:{} name:{} submit standby task id:{}
name:{} failed!",
- processInstance.getId(),
- processInstance.getName(),
- taskInstance.getId(),
+ logger.error("Submit standby task failed!, taskCode: {},
taskName: {}",
+ taskInstance.getTaskCode(),
taskInstance.getName());
return Optional.empty();
}
// in a dag, only one taskInstance is valid per taskCode, so need
to set the old taskInstance invalid
+
LoggerUtils.setWorkflowAndTaskInstanceIDMDC(taskInstance.getProcessInstanceId(),
taskInstance.getId());
if (validTaskMap.containsKey(taskInstance.getTaskCode())) {
int oldTaskInstanceId =
validTaskMap.get(taskInstance.getTaskCode());
if (taskInstance.getId() != oldTaskInstanceId) {
@@ -969,19 +971,16 @@ public class WorkflowExecuteRunnable implements
Callable<WorkflowSubmitStatue> {
taskInstance.getProcessInstanceId(),
taskInstance.getTaskGroupPriority());
if (!acquireTaskGroup) {
- logger.info("submit task name :{}, but the first time to
try to acquire task group failed",
- taskInstance.getName());
+ logger.info("Submitted task will not be dispatch right now
because the first time to try to acquire" +
+ " task group failed, taskInstanceName: {},
taskGroupId: {}",
+ taskInstance.getName(), taskGroupId);
return Optional.of(taskInstance);
}
}
boolean dispatchSuccess =
taskProcessor.action(TaskAction.DISPATCH);
if (!dispatchSuccess) {
- logger.error("process id:{} name:{} dispatch standby task
id:{} name:{} failed!",
- processInstance.getId(),
- processInstance.getName(),
- taskInstance.getId(),
- taskInstance.getName());
+ logger.error("Dispatch standby process {} task {} failed",
processInstance.getName(), taskInstance.getName());
return Optional.empty();
}
taskProcessor.action(TaskAction.RUN);
@@ -1009,11 +1008,11 @@ public class WorkflowExecuteRunnable implements
Callable<WorkflowSubmitStatue> {
}
return Optional.of(taskInstance);
} catch (Exception e) {
- logger.error("submit standby task error, taskCode: {},
taskInstanceId: {}",
- taskInstance.getTaskCode(),
- taskInstance.getId(),
- e);
+ logger.error("Submit standby task {} error, taskCode: {}",
taskInstance.getName(),
+ taskInstance.getTaskCode(), e);
return Optional.empty();
+ } finally {
+ LoggerUtils.removeWorkflowAndTaskInstanceIdMDC();
}
}
@@ -1076,7 +1075,8 @@ public class WorkflowExecuteRunnable implements
Callable<WorkflowSubmitStatue> {
public TaskInstance cloneRetryTaskInstance(TaskInstance taskInstance) {
TaskNode taskNode =
dag.getNode(Long.toString(taskInstance.getTaskCode()));
if (taskNode == null) {
- logger.error("taskNode is null, code:{}",
taskInstance.getTaskCode());
+ logger.error("Clone retry taskInstance error because taskNode is
null, taskCode:{}",
+ taskInstance.getTaskCode());
return null;
}
TaskInstance newTaskInstance = newTaskInstance(processInstance,
taskNode);
@@ -1103,7 +1103,8 @@ public class WorkflowExecuteRunnable implements
Callable<WorkflowSubmitStatue> {
public TaskInstance cloneTolerantTaskInstance(TaskInstance taskInstance) {
TaskNode taskNode =
dag.getNode(Long.toString(taskInstance.getTaskCode()));
if (taskNode == null) {
- logger.error("taskNode is null, code:{}",
taskInstance.getTaskCode());
+ logger.error("Clone tolerant taskInstance error because taskNode
is null, taskCode:{}",
+ taskInstance.getTaskCode());
return null;
}
TaskInstance newTaskInstance = newTaskInstance(processInstance,
taskNode);
@@ -1347,11 +1348,11 @@ public class WorkflowExecuteRunnable implements
Callable<WorkflowSubmitStatue> {
}
if (task.getId() != null &&
completeTaskMap.containsKey(task.getTaskCode())) {
- logger.info("task {} has already run success", task.getName());
+ logger.info("Task has already run success, taskName: {}",
task.getName());
continue;
}
if (task.getState().isKill()) {
- logger.info("task {} stopped, the state is {}",
task.getName(), task.getState());
+ logger.info("Task is be stopped, the state is {},
taskInstanceId: {}", task.getState(), task.getId());
continue;
}
@@ -1403,8 +1404,8 @@ public class WorkflowExecuteRunnable implements
Callable<WorkflowSubmitStatue> {
}
}
}
- logger.info("taskCode: {} completeDependTaskList: {}", taskCode,
- Arrays.toString(completeTaskMap.keySet().toArray()));
+ logger.info("The dependTasks of task all success, currentTaskCode: {},
dependTaskCodes: {}",
+ taskCode, Arrays.toString(completeTaskMap.keySet().toArray()));
return DependResult.SUCCESS;
}
@@ -1436,6 +1437,9 @@ public class WorkflowExecuteRunnable implements
Callable<WorkflowSubmitStatue> {
List<String> nextTaskList =
DagHelper.parseConditionTask(dependNodeName,
skipTaskNodeMap, dag, getCompleteTaskInstanceMap());
if (!nextTaskList.contains(nextNodeName)) {
+ logger.info("DependTask is a condition task, and its next
condition branch does not hava current task, " +
+ "dependTaskCode: {}, currentTaskCode: {}",
dependNodeName, nextNodeName
+ );
return false;
}
} else {
@@ -1719,10 +1723,11 @@ public class WorkflowExecuteRunnable implements
Callable<WorkflowSubmitStatue> {
*/
public void addTaskToStandByList(TaskInstance taskInstance) {
if (readyToSubmitTaskQueue.contains(taskInstance)) {
- logger.warn("task was found in ready submit queue, task code:{}",
taskInstance.getTaskCode());
+ logger.warn("Task already exists in ready submit queue, no need to
add again, task code:{}",
+ taskInstance.getTaskCode());
return;
}
- logger.info("add task to stand by list, task name:{}, task id:{}, task
code:{}",
+ logger.info("Add task to stand by list, task name:{}, task id:{}, task
code:{}",
taskInstance.getName(),
taskInstance.getId(),
taskInstance.getTaskCode());
@@ -1807,8 +1812,8 @@ public class WorkflowExecuteRunnable implements
Callable<WorkflowSubmitStatue> {
TaskInstance retryTask =
processService.findTaskInstanceById(task.getId());
if (retryTask != null &&
retryTask.getState().isForceSuccess()) {
task.setState(retryTask.getState());
- logger.info("task: {} has been forced success, put it into
complete task list and stop retrying",
- task.getName());
+ logger.info("Task {} has been forced success, put it into
complete task list and stop retrying, taskInstanceId: {}",
+ task.getName(), task.getId());
removeTaskFromStandbyList(task);
completeTaskMap.put(task.getTaskCode(), task.getId());
taskInstanceMap.put(task.getId(), task);
@@ -1824,6 +1829,7 @@ public class WorkflowExecuteRunnable implements
Callable<WorkflowSubmitStatue> {
}
DependResult dependResult = getDependResultForTask(task);
if (DependResult.SUCCESS == dependResult) {
+ logger.info("The dependResult of task {} is success, so ready
to submit to execute", task.getName());
Optional<TaskInstance> taskInstanceOptional =
submitTaskExec(task);
if (!taskInstanceOptional.isPresent()) {
this.taskFailedSubmit = true;
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThreadPool.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThreadPool.java
index 5d0e237027..84e784e6c9 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThreadPool.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThreadPool.java
@@ -158,10 +158,17 @@ public class WorkflowExecuteThreadPool extends
ThreadPoolTaskExecutor {
ProcessInstance processInstance = entry.getKey();
TaskInstance taskInstance = entry.getValue();
String address = NetUtils.getAddr(masterConfig.getListenPort());
- if (processInstance.getHost().equalsIgnoreCase(address)) {
- this.notifyMyself(processInstance, taskInstance);
- } else {
- this.notifyProcess(finishProcessInstance, processInstance,
taskInstance);
+ try {
+
LoggerUtils.setWorkflowAndTaskInstanceIDMDC(processInstance.getId(),
taskInstance.getId());
+ if (processInstance.getHost().equalsIgnoreCase(address)) {
+ logger.info("Process host is local master, will notify
it");
+ this.notifyMyself(processInstance, taskInstance);
+ } else {
+ logger.info("Process host is remote master, will notify
it");
+ this.notifyProcess(finishProcessInstance, processInstance,
taskInstance);
+ }
+ } finally {
+ LoggerUtils.removeWorkflowAndTaskInstanceIdMDC();
}
}
}
@@ -190,8 +197,8 @@ public class WorkflowExecuteThreadPool extends
ThreadPoolTaskExecutor {
TaskInstance taskInstance) {
String processInstanceHost = processInstance.getHost();
if (Strings.isNullOrEmpty(processInstanceHost)) {
- logger.error("process {} host is empty, cannot notify task {}
now", processInstance.getId(),
- taskInstance.getId());
+ logger.error("Process {} host is empty, cannot notify task {} now,
taskId: {}", processInstance.getName(),
+ taskInstance.getName(), taskInstance.getId());
return;
}
WorkflowStateEventChangeCommand workflowStateEventChangeCommand = new
WorkflowStateEventChangeCommand(
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java
index a67ee628ea..66a5b493a1 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java
@@ -304,6 +304,7 @@ public abstract class BaseTaskProcessor implements
ITaskProcessor {
// verify tenant is null
if (verifyTenantIsNull(tenant, taskInstance)) {
+ logger.info("Task state changes to {}",
TaskExecutionStatus.FAILURE);
taskInstance.setState(TaskExecutionStatus.FAILURE);
processService.saveTaskInstance(taskInstance);
return null;
@@ -418,7 +419,7 @@ public abstract class BaseTaskProcessor implements
ITaskProcessor {
int ruleId = dataQualityParameters.getRuleId();
DqRule dqRule = processService.getDqRule(ruleId);
if (dqRule == null) {
- logger.error("can not get DqRule by id {}", ruleId);
+ logger.error("Can not get dataQuality rule by id {}", ruleId);
return;
}
@@ -428,7 +429,7 @@ public abstract class BaseTaskProcessor implements
ITaskProcessor {
List<DqRuleInputEntry> ruleInputEntryList =
processService.getRuleInputEntry(ruleId);
if (CollectionUtils.isEmpty(ruleInputEntryList)) {
- logger.error("{} rule input entry list is empty ", ruleId);
+ logger.error("Rule input entry list is empty, ruleId: {}", ruleId);
return;
}
List<DqRuleExecuteSql> executeSqlList =
processService.getDqExecuteSql(ruleId);
@@ -603,9 +604,7 @@ public abstract class BaseTaskProcessor implements
ITaskProcessor {
*/
protected boolean verifyTenantIsNull(Tenant tenant, TaskInstance
taskInstance) {
if (tenant == null) {
- logger.error("tenant not exists,process instance id : {},task
instance id : {}",
- taskInstance.getProcessInstance().getId(),
- taskInstance.getId());
+ logger.error("Tenant does not exists");
return true;
}
return false;
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java
index 525c0324ad..fc4c4c337d 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java
@@ -102,18 +102,18 @@ public class CommonTaskProcessor extends
BaseTaskProcessor {
this.initQueue();
}
if (taskInstance.getState().isFinished()) {
- logger.info("submit task , but task [{}] state [{}] is already
finished. ", taskInstance.getName(),
- taskInstance.getState());
+ logger.info("Task {} has already finished, no need to submit
to task queue, taskState: {}",
+ taskInstance.getName(), taskInstance.getState());
return true;
}
// task cannot be submitted because its execution state is RUNNING
or DELAY.
if (taskInstance.getState() ==
TaskExecutionStatus.RUNNING_EXECUTION
|| taskInstance.getState() ==
TaskExecutionStatus.DELAY_EXECUTION) {
- logger.info("submit task, but the status of the task {} is
already running or delayed.",
- taskInstance.getName());
+ logger.info("Task {} is already running or delayed, no need to
submit to task queue, taskState: {}",
+ taskInstance.getName(), taskInstance.getState());
return true;
}
- logger.info("task ready to dispatch to worker: taskInstanceId:
{}", taskInstance.getId());
+ logger.info("Task {} is ready to dispatch to worker",
taskInstance.getName());
TaskPriority taskPriority = new
TaskPriority(processInstance.getProcessInstancePriority().getCode(),
processInstance.getId(),
taskInstance.getProcessInstancePriority().getCode(),
@@ -122,17 +122,17 @@ public class CommonTaskProcessor extends
BaseTaskProcessor {
TaskExecutionContext taskExecutionContext =
getTaskExecutionContext(taskInstance);
if (taskExecutionContext == null) {
- logger.error("task get taskExecutionContext fail: {}",
taskInstance);
+ logger.error("Get taskExecutionContext fail, task: {}",
taskInstance);
return false;
}
taskPriority.setTaskExecutionContext(taskExecutionContext);
taskUpdateQueue.put(taskPriority);
- logger.info("Master submit task to priority queue success,
taskInstanceId : {}", taskInstance.getId());
+ logger.info("Task {} is submitted to priority queue success by
master", taskInstance.getName());
return true;
} catch (Exception e) {
- logger.error("submit task error", e);
+ logger.error("Task {} is submitted to priority queue error",
taskInstance.getName(), e);
return false;
}
}
diff --git
a/dolphinscheduler-remote/src/test/java/org/apache/dolphinscheduler/rpc/RpcTest.java
b/dolphinscheduler-remote/src/test/java/org/apache/dolphinscheduler/rpc/RpcTest.java
index bd4211ea1a..e216dbc95f 100644
---
a/dolphinscheduler-remote/src/test/java/org/apache/dolphinscheduler/rpc/RpcTest.java
+++
b/dolphinscheduler-remote/src/test/java/org/apache/dolphinscheduler/rpc/RpcTest.java
@@ -44,17 +44,6 @@ public class RpcTest {
userService = rpcClient.create(IUserService.class, host);
}
- @Test
- public void sendTest() {
- Integer result = userService.hi(3);
- Assert.assertSame(4, result);
- result = userService.hi(4);
- Assert.assertSame(5, result);
- userService.say("sync");
- userService.callBackIsFalse("async no call back");
- userService.hi(999999);
- }
-
@After
public void after() {
NettyClient.getInstance().close();