This is an automated email from the ASF dual-hosted git repository.
kirs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new 2486451 remove AbstractTask.getCurTaskParamsClass()and replace with
TaskParametersUtils.getParameters() (#4262)
2486451 is described below
commit 24864512212a94f53da32eb20dc56673f960547a
Author: Tq <[email protected]>
AuthorDate: Sun Dec 20 21:52:52 2020 +0800
remove AbstractTask.getCurTaskParamsClass()and replace with
TaskParametersUtils.getParameters() (#4262)
* remove getCurTaskParamsClass() in AbstractTask.java and replace it with
TaskParametersUtils.getParameters()
* remove unused imports in AbstractTask.java
* reformat
---
.../server/worker/task/AbstractTask.java | 110 ++++++---------------
1 file changed, 31 insertions(+), 79 deletions(-)
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractTask.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractTask.java
index fe60f4a..de7d35f 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractTask.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractTask.java
@@ -16,6 +16,8 @@
*/
package org.apache.dolphinscheduler.server.worker.task;
+import static ch.qos.logback.classic.ClassicConstants.FINALIZE_SESSION_MARKER;
+
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
@@ -23,27 +25,17 @@ import
org.apache.dolphinscheduler.common.enums.TaskRecordStatus;
import org.apache.dolphinscheduler.common.enums.TaskType;
import org.apache.dolphinscheduler.common.process.Property;
import org.apache.dolphinscheduler.common.task.AbstractParameters;
-import org.apache.dolphinscheduler.common.task.conditions.ConditionsParameters;
-import org.apache.dolphinscheduler.common.task.datax.DataxParameters;
-import org.apache.dolphinscheduler.common.task.flink.FlinkParameters;
-import org.apache.dolphinscheduler.common.task.mr.MapreduceParameters;
-import org.apache.dolphinscheduler.common.task.procedure.ProcedureParameters;
-import org.apache.dolphinscheduler.common.task.python.PythonParameters;
-import org.apache.dolphinscheduler.common.task.shell.ShellParameters;
-import org.apache.dolphinscheduler.common.task.spark.SparkParameters;
-import org.apache.dolphinscheduler.common.task.sql.SqlParameters;
-import org.apache.dolphinscheduler.common.task.sqoop.SqoopParameters;
-import org.apache.dolphinscheduler.common.utils.JSONUtils;
+import org.apache.dolphinscheduler.common.utils.TaskParametersUtils;
import org.apache.dolphinscheduler.dao.TaskRecordDao;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.utils.ParamUtils;
+
import org.apache.commons.lang.StringUtils;
-import org.slf4j.Logger;
import java.util.List;
import java.util.Map;
-import static ch.qos.logback.classic.ClassicConstants.FINALIZE_SESSION_MARKER;
+import org.slf4j.Logger;
/**
* executive task
@@ -61,13 +53,13 @@ public abstract class AbstractTask {
TaskExecutionContext taskExecutionContext;
/**
- * log record
+ * log record
*/
protected Logger logger;
/**
- * SHELL process pid
+ * SHELL process pid
*/
protected int processId;
@@ -83,14 +75,15 @@ public abstract class AbstractTask {
protected volatile boolean cancel = false;
/**
- * exit code
+ * exit code
*/
protected volatile int exitStatusCode = -1;
/**
* constructor
+ *
* @param taskExecutionContext taskExecutionContext
- * @param logger logger
+ * @param logger logger
*/
protected AbstractTask(TaskExecutionContext taskExecutionContext, Logger
logger) {
this.taskExecutionContext = taskExecutionContext;
@@ -99,6 +92,7 @@ public abstract class AbstractTask {
/**
* init task
+ *
* @throws Exception exception
*/
public void init() throws Exception {
@@ -106,6 +100,7 @@ public abstract class AbstractTask {
/**
* task handle
+ *
* @throws Exception exception
*/
public abstract void handle() throws Exception;
@@ -113,6 +108,7 @@ public abstract class AbstractTask {
/**
* cancel application
+ *
* @param status status
* @throws Exception exception
*/
@@ -122,6 +118,7 @@ public abstract class AbstractTask {
/**
* log handle
+ *
* @param logs log list
*/
public void logHandle(List<String> logs) {
@@ -136,13 +133,15 @@ public abstract class AbstractTask {
public void setVarPool(String varPool) {
this.varPool = varPool;
}
+
public String getVarPool() {
return varPool;
}
/**
* get exit status code
- * @return exit status code
+ *
+ * @return exit status code
*/
public int getExitStatusCode() {
return exitStatusCode;
@@ -170,21 +169,21 @@ public abstract class AbstractTask {
/**
* get task parameters
+ *
* @return AbstractParameters
*/
public abstract AbstractParameters getParameters();
-
/**
* result processing
*/
- public void after(){
- if (getExitStatusCode() == Constants.EXIT_CODE_SUCCESS){
+ public void after() {
+ if (getExitStatusCode() == Constants.EXIT_CODE_SUCCESS) {
// task recor flat : if true , start up qianfan
if (TaskRecordDao.getTaskRecordFlag()
- &&
TaskType.typeIsNormalTask(taskExecutionContext.getTaskType())){
- AbstractParameters params = (AbstractParameters)
JSONUtils.parseObject(taskExecutionContext.getTaskParams(),
getCurTaskParamsClass());
+ &&
TaskType.typeIsNormalTask(taskExecutionContext.getTaskType())) {
+ AbstractParameters params =
TaskParametersUtils.getParameters(taskExecutionContext.getTaskType(),
taskExecutionContext.getTaskParams());
// replace placeholder
Map<String, Property> paramsMap =
ParamUtils.convert(ParamUtils.getUserDefParamsMap(taskExecutionContext.getDefinedParams()),
@@ -193,81 +192,34 @@ public abstract class AbstractTask {
CommandType.of(taskExecutionContext.getCmdTypeIfComplement()),
taskExecutionContext.getScheduleTime());
if (paramsMap != null && !paramsMap.isEmpty()
- && paramsMap.containsKey("v_proc_date")){
+ && paramsMap.containsKey("v_proc_date")) {
String vProcDate = paramsMap.get("v_proc_date").getValue();
- if (!StringUtils.isEmpty(vProcDate)){
+ if (!StringUtils.isEmpty(vProcDate)) {
TaskRecordStatus taskRecordState =
TaskRecordDao.getTaskRecordState(taskExecutionContext.getTaskName(), vProcDate);
- logger.info("task record status : {}",taskRecordState);
- if (taskRecordState == TaskRecordStatus.FAILURE){
+ logger.info("task record status : {}",
taskRecordState);
+ if (taskRecordState == TaskRecordStatus.FAILURE) {
setExitStatusCode(Constants.EXIT_CODE_FAILURE);
}
}
}
}
- }else if (getExitStatusCode() == Constants.EXIT_CODE_KILL){
+ } else if (getExitStatusCode() == Constants.EXIT_CODE_KILL) {
setExitStatusCode(Constants.EXIT_CODE_KILL);
- }else {
+ } else {
setExitStatusCode(Constants.EXIT_CODE_FAILURE);
}
}
-
-
- /**
- * get current task parameter class
- * @return Task Params Class
- */
- private Class getCurTaskParamsClass(){
- Class paramsClass = null;
- // get task type
- TaskType taskType =
TaskType.valueOf(taskExecutionContext.getTaskType());
- switch (taskType){
- case SHELL:
- paramsClass = ShellParameters.class;
- break;
- case SQL:
- paramsClass = SqlParameters.class;
- break;
- case PROCEDURE:
- paramsClass = ProcedureParameters.class;
- break;
- case MR:
- paramsClass = MapreduceParameters.class;
- break;
- case SPARK:
- paramsClass = SparkParameters.class;
- break;
- case FLINK:
- paramsClass = FlinkParameters.class;
- break;
- case PYTHON:
- paramsClass = PythonParameters.class;
- break;
- case DATAX:
- paramsClass = DataxParameters.class;
- break;
- case SQOOP:
- paramsClass = SqoopParameters.class;
- break;
- case CONDITIONS:
- paramsClass = ConditionsParameters.class;
- break;
- default:
- logger.error("not support this task type: {}", taskType);
- throw new IllegalArgumentException("not support this task
type");
- }
- return paramsClass;
- }
-
/**
* get exit status according to exitCode
+ *
* @return exit status
*/
- public ExecutionStatus getExitStatus(){
+ public ExecutionStatus getExitStatus() {
ExecutionStatus status;
- switch (getExitStatusCode()){
+ switch (getExitStatusCode()) {
case Constants.EXIT_CODE_SUCCESS:
status = ExecutionStatus.SUCCESS;
break;