This is an automated email from the ASF dual-hosted git repository.
leonbao pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new c2585a1 [BUG] master task log can't output repair (#7029)
c2585a1 is described below
commit c2585a1170e30dfa9e65f040b0412288ec930441
Author: zhang <[email protected]>
AuthorDate: Tue Nov 30 11:04:32 2021 +0800
[BUG] master task log can't output repair (#7029)
* update task_definition_log field typ
update task_definition_log field task_params type,modify text to longtext
to be consistent with the main table
* update t_ds_task_definition_log field task_params
update t_ds_task_definition_log field task_params
* [FIX:211128] master task log Can't output repair
Co-authored-by: crane <[email protected]>
---
.../master/runner/task/BaseTaskProcessor.java | 28 +++++++++++++++++-----
.../master/runner/task/CommonTaskProcessor.java | 5 +---
.../master/runner/task/ConditionTaskProcessor.java | 7 +-----
.../master/runner/task/DependentTaskProcessor.java | 1 +
.../master/runner/task/SubTaskProcessor.java | 2 +-
.../master/runner/task/SwitchTaskProcessor.java | 1 +
6 files changed, 27 insertions(+), 17 deletions(-)
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java
index 7194ff9..3836a71 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java
@@ -31,6 +31,7 @@ import
org.apache.dolphinscheduler.common.task.sqoop.sources.SourceMysqlParamete
import
org.apache.dolphinscheduler.common.task.sqoop.targets.TargetMysqlParameter;
import org.apache.dolphinscheduler.common.utils.HadoopUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
+import org.apache.dolphinscheduler.common.utils.LoggerUtils;
import org.apache.dolphinscheduler.common.utils.TaskParametersUtils;
import org.apache.dolphinscheduler.dao.entity.DataSource;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
@@ -97,6 +98,7 @@ public abstract class BaseTaskProcessor implements
ITaskProcessor {
/**
* task timeout process
+ *
* @return
*/
protected abstract boolean taskTimeout();
@@ -105,6 +107,7 @@ public abstract class BaseTaskProcessor implements
ITaskProcessor {
public void run() {
}
+
@Override
public boolean action(TaskAction taskAction) {
@@ -217,10 +220,22 @@ public abstract class BaseTaskProcessor implements
ITaskProcessor {
}
/**
+ * set master task running logger.
+ */
+ public void setTaskExecutionLogger() {
+ logger =
LoggerFactory.getLogger(LoggerUtils.buildTaskId(LoggerUtils.TASK_LOGGER_INFO_PREFIX,
+ taskInstance.getFirstSubmitTime(),
+ processInstance.getProcessDefinitionCode(),
+ processInstance.getProcessDefinitionVersion(),
+ taskInstance.getProcessInstanceId(),
+ taskInstance.getId()));
+ }
+
+ /**
* set procedure task relation
*
* @param procedureTaskExecutionContext procedureTaskExecutionContext
- * @param taskInstance taskInstance
+ * @param taskInstance taskInstance
*/
private void setProcedureTaskRelation(ProcedureTaskExecutionContext
procedureTaskExecutionContext, TaskInstance taskInstance) {
ProcedureParameters procedureParameters =
JSONUtils.parseObject(taskInstance.getTaskParams(), ProcedureParameters.class);
@@ -233,7 +248,7 @@ public abstract class BaseTaskProcessor implements
ITaskProcessor {
* set datax task relation
*
* @param dataxTaskExecutionContext dataxTaskExecutionContext
- * @param taskInstance taskInstance
+ * @param taskInstance taskInstance
*/
protected void setDataxTaskRelation(DataxTaskExecutionContext
dataxTaskExecutionContext, TaskInstance taskInstance) {
DataxParameters dataxParameters =
JSONUtils.parseObject(taskInstance.getTaskParams(), DataxParameters.class);
@@ -258,7 +273,7 @@ public abstract class BaseTaskProcessor implements
ITaskProcessor {
* set sqoop task relation
*
* @param sqoopTaskExecutionContext sqoopTaskExecutionContext
- * @param taskInstance taskInstance
+ * @param taskInstance taskInstance
*/
private void setSqoopTaskRelation(SqoopTaskExecutionContext
sqoopTaskExecutionContext, TaskInstance taskInstance) {
SqoopParameters sqoopParameters =
JSONUtils.parseObject(taskInstance.getTaskParams(), SqoopParameters.class);
@@ -285,11 +300,12 @@ public abstract class BaseTaskProcessor implements
ITaskProcessor {
}
}
+
/**
* set SQL task relation
*
* @param sqlTaskExecutionContext sqlTaskExecutionContext
- * @param taskInstance taskInstance
+ * @param taskInstance taskInstance
*/
private void setSQLTaskRelation(SQLTaskExecutionContext
sqlTaskExecutionContext, TaskInstance taskInstance) {
SqlParameters sqlParameters =
JSONUtils.parseObject(taskInstance.getTaskParams(), SqlParameters.class);
@@ -301,7 +317,7 @@ public abstract class BaseTaskProcessor implements
ITaskProcessor {
// whether udf type
boolean udfTypeFlag = Enums.getIfPresent(UdfType.class,
Strings.nullToEmpty(sqlParameters.getType())).isPresent()
- && !StringUtils.isEmpty(sqlParameters.getUdfs());
+ && !StringUtils.isEmpty(sqlParameters.getUdfs());
if (udfTypeFlag) {
String[] udfFunIds = sqlParameters.getUdfs().split(",");
@@ -325,7 +341,7 @@ public abstract class BaseTaskProcessor implements
ITaskProcessor {
/**
* whehter tenant is null
*
- * @param tenant tenant
+ * @param tenant tenant
* @param taskInstance taskInstance
* @return result
*/
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java
index 7b193bf..aed6828 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java
@@ -57,10 +57,6 @@ public class CommonTaskProcessor extends BaseTaskProcessor {
@Autowired
NettyExecutorManager nettyExecutorManager =
SpringApplicationContext.getBean(NettyExecutorManager.class);
- /**
- * logger of MasterBaseTaskExecThread
- */
- protected Logger logger = LoggerFactory.getLogger(getClass());
@Override
public boolean submit(TaskInstance task, ProcessInstance processInstance,
int maxRetryTimes, int commitInterval) {
@@ -70,6 +66,7 @@ public class CommonTaskProcessor extends BaseTaskProcessor {
if (this.taskInstance == null) {
return false;
}
+ setTaskExecutionLogger();
dispatchTask(taskInstance, processInstance);
return true;
}
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ConditionTaskProcessor.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ConditionTaskProcessor.java
index ee03602..2412659 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ConditionTaskProcessor.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ConditionTaskProcessor.java
@@ -81,12 +81,7 @@ public class ConditionTaskProcessor extends
BaseTaskProcessor {
taskInstance.getTaskCode(),
taskInstance.getTaskDefinitionVersion()
);
- logger =
LoggerFactory.getLogger(LoggerUtils.buildTaskId(LoggerUtils.TASK_LOGGER_INFO_PREFIX,
- taskInstance.getFirstSubmitTime(),
- processInstance.getProcessDefinitionCode(),
- processInstance.getProcessDefinitionVersion(),
- taskInstance.getProcessInstanceId(),
- taskInstance.getId()));
+ setTaskExecutionLogger();
String threadLoggerInfoName =
String.format(Constants.TASK_LOG_INFO_FORMAT,
processService.formatTaskAppId(this.taskInstance));
Thread.currentThread().setName(threadLoggerInfoName);
initTaskParameters();
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/DependentTaskProcessor.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/DependentTaskProcessor.java
index d873a81..0478f7e 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/DependentTaskProcessor.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/DependentTaskProcessor.java
@@ -93,6 +93,7 @@ public class DependentTaskProcessor extends BaseTaskProcessor
{
processInstance.getProcessDefinitionVersion(),
taskInstance.getProcessInstanceId(),
taskInstance.getId()));
+ setTaskExecutionLogger();
taskInstance.setHost(NetUtils.getAddr(masterConfig.getListenPort()));
taskInstance.setState(ExecutionStatus.RUNNING_EXECUTION);
taskInstance.setStartTime(new Date());
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SubTaskProcessor.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SubTaskProcessor.java
index 421efd3..32cbce1 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SubTaskProcessor.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SubTaskProcessor.java
@@ -59,7 +59,7 @@ public class SubTaskProcessor extends BaseTaskProcessor {
if (this.taskInstance == null) {
return false;
}
-
+ setTaskExecutionLogger();
return true;
}
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SwitchTaskProcessor.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SwitchTaskProcessor.java
index 3b95c43..b67dc47 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SwitchTaskProcessor.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SwitchTaskProcessor.java
@@ -75,6 +75,7 @@ public class SwitchTaskProcessor extends BaseTaskProcessor {
processInstance.getProcessDefinitionVersion(),
taskInstance.getProcessInstanceId(),
taskInstance.getId()));
+ setTaskExecutionLogger();
taskInstance.setHost(NetUtils.getAddr(masterConfig.getListenPort()));
taskInstance.setState(ExecutionStatus.RUNNING_EXECUTION);
taskInstance.setStartTime(new Date());