This is an automated email from the ASF dual-hosted git repository.
kerwin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new fced389 [Fix][Task] Wrong complement date (#6009) (#6186)
fced389 is described below
commit fced3892ee20d64c3eafe187cbd94508ac339a98
Author: Kirs <[email protected]>
AuthorDate: Fri Sep 17 17:04:51 2021 +0800
[Fix][Task] Wrong complement date (#6009) (#6186)
* [Fix][Task] Wrong complement date (#6009)
---
.../server/worker/runner/TaskExecuteThread.java | 23 ++++++++++++++++++++++
.../plugin/task/python/PythonTask.java | 18 +++++++++++------
.../plugin/task/sqoop/SqoopTask.java | 18 +++++++++++------
3 files changed, 47 insertions(+), 12 deletions(-)
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java
index b0a5625..b6ad565 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java
@@ -17,6 +17,10 @@
package org.apache.dolphinscheduler.server.worker.runner;
+import static java.util.Calendar.DAY_OF_MONTH;
+
+import org.apache.dolphinscheduler.common.Constants;
+import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.common.enums.Event;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.enums.TaskType;
@@ -163,6 +167,8 @@ public class TaskExecuteThread implements Runnable, Delayed
{
taskExecutionContext.getProcessInstanceId(),
taskExecutionContext.getTaskInstanceId()));
+ preBuildBusinessParams();
+
TaskChannel taskChannel =
taskPluginManager.getTaskChannelMap().get(taskExecutionContext.getTaskType());
if (null == taskChannel) {
throw new PluginNotFoundException(String.format("%s Task
Plugin Not Found,Please Check Config File.",
taskExecutionContext.getTaskType()));
@@ -359,4 +365,21 @@ public class TaskExecuteThread implements Runnable,
Delayed {
}
return Long.compare(this.getDelay(TimeUnit.MILLISECONDS),
o.getDelay(TimeUnit.MILLISECONDS));
}
+
+ private void preBuildBusinessParams() {
+ Map<String, Property> paramsMap = new HashMap<>();
+ // replace variable TIME with $[YYYYmmddd...] in shell file when
history run job and batch complement job
+ if (taskExecutionContext.getScheduleTime() != null) {
+ Date date = taskExecutionContext.getScheduleTime();
+ if (CommandType.COMPLEMENT_DATA.getCode() ==
taskExecutionContext.getCmdTypeIfComplement()) {
+ date = DateUtils.add(taskExecutionContext.getScheduleTime(),
DAY_OF_MONTH, 1);
+ }
+ String dateTime = DateUtils.format(date,
Constants.PARAMETER_FORMAT_TIME);
+ Property p = new Property();
+ p.setValue(dateTime);
+ p.setProp(Constants.PARAMETER_DATETIME);
+ paramsMap.put(Constants.PARAMETER_DATETIME, p);
+ }
+ taskExecutionContext.setParamsMap(paramsMap);
+ }
}
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-python/src/main/java/org/apache/dolphinscheduler/plugin/task/python/PythonTask.java
b/dolphinscheduler-task-plugin/dolphinscheduler-task-python/src/main/java/org/apache/dolphinscheduler/plugin/task/python/PythonTask.java
index 03462b8..01457da 100644
---
a/dolphinscheduler-task-plugin/dolphinscheduler-task-python/src/main/java/org/apache/dolphinscheduler/plugin/task/python/PythonTask.java
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-python/src/main/java/org/apache/dolphinscheduler/plugin/task/python/PythonTask.java
@@ -20,6 +20,7 @@ package org.apache.dolphinscheduler.plugin.task.python;
import org.apache.dolphinscheduler.plugin.task.api.AbstractTaskExecutor;
import org.apache.dolphinscheduler.plugin.task.api.TaskException;
import org.apache.dolphinscheduler.plugin.task.api.TaskResponse;
+import org.apache.dolphinscheduler.plugin.task.util.MapUtils;
import org.apache.dolphinscheduler.spi.task.AbstractParameters;
import org.apache.dolphinscheduler.spi.task.Property;
import org.apache.dolphinscheduler.spi.task.TaskConstants;
@@ -27,6 +28,8 @@ import
org.apache.dolphinscheduler.spi.task.paramparser.ParamUtils;
import org.apache.dolphinscheduler.spi.task.paramparser.ParameterUtils;
import org.apache.dolphinscheduler.spi.task.request.TaskRequest;
import org.apache.dolphinscheduler.spi.utils.JSONUtils;
+
+import java.util.HashMap;
import java.util.Map;
/**
@@ -51,7 +54,6 @@ public class PythonTask extends AbstractTaskExecutor {
private TaskRequest taskRequest;
-
/**
* constructor
*
@@ -101,7 +103,7 @@ public class PythonTask extends AbstractTaskExecutor {
} catch (Exception e) {
logger.error("python task failure", e);
setExitStatusCode(TaskConstants.EXIT_CODE_FAILURE);
- throw new TaskException("run python task error",e);
+ throw new TaskException("run python task error", e);
}
}
@@ -150,6 +152,7 @@ public class PythonTask extends AbstractTaskExecutor {
/**
* build command
+ *
* @return raw python script
* @throws Exception exception
*/
@@ -157,10 +160,14 @@ public class PythonTask extends AbstractTaskExecutor {
String rawPythonScript =
pythonParameters.getRawScript().replaceAll("\\r\\n", "\n");
// replace placeholder
- Map<String, Property> paramsMap =
ParamUtils.convert(taskRequest,pythonParameters);
- if (paramsMap != null){
- rawPythonScript =
ParameterUtils.convertParameterPlaceholders(rawPythonScript,
ParamUtils.convert(paramsMap));
+ Map<String, Property> paramsMap = ParamUtils.convert(taskRequest,
pythonParameters);
+ if (MapUtils.isEmpty(paramsMap)) {
+ paramsMap = new HashMap<>();
}
+ if (MapUtils.isNotEmpty(taskRequest.getParamsMap())) {
+ paramsMap.putAll(taskRequest.getParamsMap());
+ }
+ rawPythonScript =
ParameterUtils.convertParameterPlaceholders(rawPythonScript,
ParamUtils.convert(paramsMap));
logger.info("raw python script : {}", pythonParameters.getRawScript());
logger.info("task dir : {}", taskDir);
@@ -168,5 +175,4 @@ public class PythonTask extends AbstractTaskExecutor {
return rawPythonScript;
}
-
}
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-sqoop/src/main/java/org/apache/dolphinscheduler/plugin/task/sqoop/SqoopTask.java
b/dolphinscheduler-task-plugin/dolphinscheduler-task-sqoop/src/main/java/org/apache/dolphinscheduler/plugin/task/sqoop/SqoopTask.java
index ea6de20..a5f4376 100644
---
a/dolphinscheduler-task-plugin/dolphinscheduler-task-sqoop/src/main/java/org/apache/dolphinscheduler/plugin/task/sqoop/SqoopTask.java
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-sqoop/src/main/java/org/apache/dolphinscheduler/plugin/task/sqoop/SqoopTask.java
@@ -20,6 +20,7 @@ package org.apache.dolphinscheduler.plugin.task.sqoop;
import org.apache.dolphinscheduler.plugin.task.api.AbstractYarnTask;
import
org.apache.dolphinscheduler.plugin.task.sqoop.generator.SqoopJobGenerator;
import org.apache.dolphinscheduler.plugin.task.sqoop.parameter.SqoopParameters;
+import org.apache.dolphinscheduler.plugin.task.util.MapUtils;
import org.apache.dolphinscheduler.spi.task.AbstractParameters;
import org.apache.dolphinscheduler.spi.task.Property;
import org.apache.dolphinscheduler.spi.task.paramparser.ParamUtils;
@@ -27,6 +28,7 @@ import
org.apache.dolphinscheduler.spi.task.paramparser.ParameterUtils;
import org.apache.dolphinscheduler.spi.task.request.SqoopTaskRequest;
import org.apache.dolphinscheduler.spi.utils.JSONUtils;
+import java.util.HashMap;
import java.util.Map;
/**
@@ -53,7 +55,7 @@ public class SqoopTask extends AbstractYarnTask {
public void init() {
logger.info("sqoop task params {}",
sqoopTaskExecutionContext.getTaskParams());
sqoopParameters =
- JSONUtils.parseObject(sqoopTaskExecutionContext.getTaskParams(),
SqoopParameters.class);
+
JSONUtils.parseObject(sqoopTaskExecutionContext.getTaskParams(),
SqoopParameters.class);
//check sqoop task params
if (null == sqoopParameters) {
throw new IllegalArgumentException("Sqoop Task params is null");
@@ -73,13 +75,17 @@ public class SqoopTask extends AbstractYarnTask {
// combining local and global parameters
Map<String, Property> paramsMap =
ParamUtils.convert(sqoopTaskExecutionContext, getParameters());
- if (paramsMap != null) {
- String resultScripts =
ParameterUtils.convertParameterPlaceholders(script,
ParamUtils.convert(paramsMap));
- logger.info("sqoop script: {}", resultScripts);
- return resultScripts;
+ if (MapUtils.isEmpty(paramsMap)) {
+ paramsMap = new HashMap<>();
}
+ if (MapUtils.isNotEmpty(sqoopTaskExecutionContext.getParamsMap())) {
+ paramsMap.putAll(sqoopTaskExecutionContext.getParamsMap());
+ }
+
+ String resultScripts =
ParameterUtils.convertParameterPlaceholders(script,
ParamUtils.convert(paramsMap));
+ logger.info("sqoop script: {}", resultScripts);
+ return resultScripts;
- return null;
}
@Override