This is an automated email from the ASF dual-hosted git repository.
kirs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new 4a68bfb [Improvement-5852][server] Support two parameters related to
task for the rest of type of tasks. (#5867)
4a68bfb is described below
commit 4a68bfbe1c816f9b1f2f43c1f01218689b57ed6f
Author: Hua Jiang <[email protected]>
AuthorDate: Wed Jul 21 18:36:20 2021 +0800
[Improvement-5852][server] Support two parameters related to task for the
rest of type of tasks. (#5867)
* provide two system parameters to support the rest of type of tasks
* provide two system parameters to support the rest of type of tasks
* improve test conversion
---
.../dolphinscheduler/server/utils/ParamUtils.java | 73 ++++------------------
.../server/worker/task/datax/DataxTask.java | 10 +--
.../server/worker/task/flink/FlinkTask.java | 10 +--
.../server/worker/task/http/HttpTask.java | 11 +---
.../server/worker/task/mr/MapReduceTask.java | 10 +--
.../worker/task/procedure/ProcedureTask.java | 8 +--
.../server/worker/task/python/PythonTask.java | 18 +++---
.../server/worker/task/shell/ShellTask.java | 5 +-
.../server/worker/task/spark/SparkTask.java | 10 +--
.../server/worker/task/sql/SqlTask.java | 12 +---
.../server/worker/task/sqoop/SqoopTask.java | 9 +--
.../dolphinscheduler/server/master/ParamsTest.java | 49 +--------------
.../server/utils/ParamUtilsTest.java | 63 ++++++++++++++-----
13 files changed, 85 insertions(+), 203 deletions(-)
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/ParamUtils.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/ParamUtils.java
index cbf663f..57abf0b 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/ParamUtils.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/ParamUtils.java
@@ -43,69 +43,15 @@ public class ParamUtils {
/**
* parameter conversion
- * @param globalParams global params
- * @param globalParamsMap global params map
- * @param localParams local params
- * @param commandType command type
- * @param scheduleTime schedule time
- * @return global params
- */
- public static Map<String,Property> convert(Map<String,Property>
globalParams,
- Map<String,String>
globalParamsMap,
-
Map<String,Property> localParams,
-
Map<String,Property> varParams,
- CommandType
commandType,
- Date scheduleTime) {
- if (globalParams == null && localParams == null) {
- return null;
- }
- // if it is a complement,
- // you need to pass in the task instance id to locate the time
- // of the process instance complement
- Map<String,String> timeParams = BusinessTimeUtils
- .getBusinessTime(commandType,
- scheduleTime);
-
- if (globalParamsMap != null) {
- timeParams.putAll(globalParamsMap);
- }
-
- if (globalParams != null && localParams != null) {
- localParams.putAll(globalParams);
- globalParams = localParams;
- } else if (globalParams == null && localParams != null) {
- globalParams = localParams;
- }
- if (varParams != null) {
- varParams.putAll(globalParams);
- globalParams = varParams;
- }
- Iterator<Map.Entry<String, Property>> iter =
globalParams.entrySet().iterator();
- while (iter.hasNext()) {
- Map.Entry<String, Property> en = iter.next();
- Property property = en.getValue();
-
- if (StringUtils.isNotEmpty(property.getValue())
- && property.getValue().startsWith("$")) {
- /**
- * local parameter refers to global parameter with the same
name
- * note: the global parameters of the process instance here
are solidified parameters,
- * and there are no variables in them.
- */
- String val = property.getValue();
- val = ParameterUtils.convertParameterPlaceholders(val,
timeParams);
- property.setValue(val);
- }
- }
-
- return globalParams;
- }
-
- /**
- * parameter conversion
+ * Warning:
+ * When you first invoke the function of convert, the variables of
localParams and varPool in the ShellParameters will be modified.
+ * But in the whole system the variables of localParams and varPool have
been used in other functions. I'm not sure if this current
+ * situation is wrong. So I cannot modify the original logic.
+ *
* @param taskExecutionContext the context of this task instance
* @param parameters the parameters
* @return global params
+ *
*/
public static Map<String,Property> convert(TaskExecutionContext
taskExecutionContext, AbstractParameters parameters) {
Preconditions.checkNotNull(taskExecutionContext);
@@ -115,8 +61,11 @@ public class ParamUtils {
CommandType commandType =
CommandType.of(taskExecutionContext.getCmdTypeIfComplement());
Date scheduleTime = taskExecutionContext.getScheduleTime();
+ // combining local and global parameters
Map<String,Property> localParams = parameters.getLocalParametersMap();
+ Map<String,Property> varParams = parameters.getVarPoolMap();
+
if (globalParams == null && localParams == null) {
return null;
}
@@ -141,6 +90,10 @@ public class ParamUtils {
} else if (globalParams == null && localParams != null) {
globalParams = localParams;
}
+ if (varParams != null) {
+ varParams.putAll(globalParams);
+ globalParams = varParams;
+ }
Iterator<Map.Entry<String, Property>> iter =
globalParams.entrySet().iterator();
while (iter.hasNext()) {
Map.Entry<String, Property> en = iter.next();
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/datax/DataxTask.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/datax/DataxTask.java
index b785cb5..84b2c27 100755
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/datax/DataxTask.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/datax/DataxTask.java
@@ -20,7 +20,6 @@ package org.apache.dolphinscheduler.server.worker.task.datax;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.datasource.BaseConnectionParam;
import org.apache.dolphinscheduler.common.datasource.DatasourceUtil;
-import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.common.enums.DbType;
import org.apache.dolphinscheduler.common.enums.Flag;
import org.apache.dolphinscheduler.common.process.Property;
@@ -154,13 +153,8 @@ public class DataxTask extends AbstractTask {
String threadLoggerInfoName = String.format("TaskLogInfo-%s",
taskExecutionContext.getTaskAppId());
Thread.currentThread().setName(threadLoggerInfoName);
- // combining local and global parameters
- Map<String, Property> paramsMap =
ParamUtils.convert(ParamUtils.getUserDefParamsMap(taskExecutionContext.getDefinedParams()),
- taskExecutionContext.getDefinedParams(),
- dataXParameters.getLocalParametersMap(),
- dataXParameters.getVarPoolMap(),
-
CommandType.of(taskExecutionContext.getCmdTypeIfComplement()),
- taskExecutionContext.getScheduleTime());
+ // replace placeholder,and combine local and global parameters
+ Map<String, Property> paramsMap =
ParamUtils.convert(taskExecutionContext,getParameters());
// run datax procesDataSourceService.s
String jsonFilePath = buildDataxJsonFile(paramsMap);
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/flink/FlinkTask.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/flink/FlinkTask.java
index 27e5b42..863b91a 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/flink/FlinkTask.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/flink/FlinkTask.java
@@ -17,7 +17,6 @@
package org.apache.dolphinscheduler.server.worker.task.flink;
-import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.common.process.Property;
import org.apache.dolphinscheduler.common.process.ResourceInfo;
import org.apache.dolphinscheduler.common.task.AbstractParameters;
@@ -80,13 +79,8 @@ public class FlinkTask extends AbstractYarnTask {
if (StringUtils.isNotEmpty(flinkParameters.getMainArgs())) {
String args = flinkParameters.getMainArgs();
- // replace placeholder
- Map<String, Property> paramsMap =
ParamUtils.convert(ParamUtils.getUserDefParamsMap(taskExecutionContext.getDefinedParams()),
- taskExecutionContext.getDefinedParams(),
- flinkParameters.getLocalParametersMap(),
- flinkParameters.getVarPoolMap(),
-
CommandType.of(taskExecutionContext.getCmdTypeIfComplement()),
- taskExecutionContext.getScheduleTime());
+ // combining local and global parameters
+ Map<String, Property> paramsMap =
ParamUtils.convert(taskExecutionContext,getParameters());
logger.info("param Map : {}", paramsMap);
if (paramsMap != null) {
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/http/HttpTask.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/http/HttpTask.java
index 01ac50b..4e34741 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/http/HttpTask.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/http/HttpTask.java
@@ -18,7 +18,6 @@
package org.apache.dolphinscheduler.server.worker.task.http;
import org.apache.dolphinscheduler.common.Constants;
-import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.common.enums.HttpMethod;
import org.apache.dolphinscheduler.common.enums.HttpParametersType;
import org.apache.dolphinscheduler.common.process.HttpProperty;
@@ -137,13 +136,9 @@ public class HttpTask extends AbstractTask {
protected CloseableHttpResponse sendRequest(CloseableHttpClient client)
throws IOException {
RequestBuilder builder = createRequestBuilder();
- // replace placeholder
- Map<String, Property> paramsMap =
ParamUtils.convert(ParamUtils.getUserDefParamsMap(taskExecutionContext.getDefinedParams()),
- taskExecutionContext.getDefinedParams(),
- httpParameters.getLocalParametersMap(),
- httpParameters.getVarPoolMap(),
- CommandType.of(taskExecutionContext.getCmdTypeIfComplement()),
- taskExecutionContext.getScheduleTime());
+ // replace placeholder,and combine local and global parameters
+ Map<String, Property> paramsMap =
ParamUtils.convert(taskExecutionContext,getParameters());
+
List<HttpProperty> httpPropertyList = new ArrayList<>();
if (CollectionUtils.isNotEmpty(httpParameters.getHttpParams())) {
for (HttpProperty httpProperty : httpParameters.getHttpParams()) {
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/mr/MapReduceTask.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/mr/MapReduceTask.java
index ce908df..5e8f3ca 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/mr/MapReduceTask.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/mr/MapReduceTask.java
@@ -18,7 +18,6 @@
package org.apache.dolphinscheduler.server.worker.task.mr;
import org.apache.dolphinscheduler.common.Constants;
-import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.common.enums.ProgramType;
import org.apache.dolphinscheduler.common.process.Property;
import org.apache.dolphinscheduler.common.process.ResourceInfo;
@@ -84,13 +83,8 @@ public class MapReduceTask extends AbstractYarnTask {
mapreduceParameters.setQueue(taskExecutionContext.getQueue());
setMainJarName();
- // replace placeholder
- Map<String, Property> paramsMap =
ParamUtils.convert(ParamUtils.getUserDefParamsMap(taskExecutionContext.getDefinedParams()),
- taskExecutionContext.getDefinedParams(),
- mapreduceParameters.getLocalParametersMap(),
- mapreduceParameters.getVarPoolMap(),
- CommandType.of(taskExecutionContext.getCmdTypeIfComplement()),
- taskExecutionContext.getScheduleTime());
+ // replace placeholder,and combine local and global parameters
+ Map<String, Property> paramsMap =
ParamUtils.convert(taskExecutionContext,getParameters());
if (paramsMap != null) {
String args =
ParameterUtils.convertParameterPlaceholders(mapreduceParameters.getMainArgs(),
ParamUtils.convert(paramsMap));
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/procedure/ProcedureTask.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/procedure/ProcedureTask.java
index 3748c7a..1a1573c 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/procedure/ProcedureTask.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/procedure/ProcedureTask.java
@@ -30,7 +30,6 @@ import static
org.apache.dolphinscheduler.common.enums.DataType.VARCHAR;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.datasource.ConnectionParam;
import org.apache.dolphinscheduler.common.datasource.DatasourceUtil;
-import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.common.enums.DataType;
import org.apache.dolphinscheduler.common.enums.DbType;
import org.apache.dolphinscheduler.common.enums.Direct;
@@ -119,12 +118,7 @@ public class ProcedureTask extends AbstractTask {
connection = DatasourceUtil.getConnection(dbType, connectionParam);
// combining local and global parameters
- Map<String, Property> paramsMap =
ParamUtils.convert(ParamUtils.getUserDefParamsMap(taskExecutionContext.getDefinedParams()),
- taskExecutionContext.getDefinedParams(),
- procedureParameters.getLocalParametersMap(),
- procedureParameters.getVarPoolMap(),
-
CommandType.of(taskExecutionContext.getCmdTypeIfComplement()),
- taskExecutionContext.getScheduleTime());
+ Map<String, Property> paramsMap =
ParamUtils.convert(taskExecutionContext,getParameters());
// call method
stmt = connection.prepareCall(procedureParameters.getMethod());
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/python/PythonTask.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/python/PythonTask.java
index e784a79..0ee480d 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/python/PythonTask.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/python/PythonTask.java
@@ -14,25 +14,26 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.dolphinscheduler.server.worker.task.python;
+package org.apache.dolphinscheduler.server.worker.task.python;
import org.apache.dolphinscheduler.common.Constants;
-import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.common.process.Property;
import org.apache.dolphinscheduler.common.task.AbstractParameters;
import org.apache.dolphinscheduler.common.task.python.PythonParameters;
-import org.apache.dolphinscheduler.common.utils.*;
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.ParameterUtils;
+import org.apache.dolphinscheduler.common.utils.VarPoolUtils;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.utils.ParamUtils;
import org.apache.dolphinscheduler.server.worker.task.AbstractTask;
import org.apache.dolphinscheduler.server.worker.task.CommandExecuteResult;
import org.apache.dolphinscheduler.server.worker.task.PythonCommandExecutor;
-import org.slf4j.Logger;
import java.util.Map;
+import org.slf4j.Logger;
+
/**
* python task
*/
@@ -115,13 +116,8 @@ public class PythonTask extends AbstractTask {
private String buildCommand() throws Exception {
String rawPythonScript =
pythonParameters.getRawScript().replaceAll("\\r\\n", "\n");
- // replace placeholder
- Map<String, Property> paramsMap =
ParamUtils.convert(ParamUtils.getUserDefParamsMap(taskExecutionContext.getDefinedParams()),
- taskExecutionContext.getDefinedParams(),
- pythonParameters.getLocalParametersMap(),
- pythonParameters.getVarPoolMap(),
-
CommandType.of(taskExecutionContext.getCmdTypeIfComplement()),
- taskExecutionContext.getScheduleTime());
+ // combining local and global parameters
+ Map<String, Property> paramsMap =
ParamUtils.convert(taskExecutionContext,getParameters());
try {
rawPythonScript =
VarPoolUtils.convertPythonScriptPlaceholders(rawPythonScript);
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/shell/ShellTask.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/shell/ShellTask.java
index f7887df..32c2ad1 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/shell/ShellTask.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/shell/ShellTask.java
@@ -21,7 +21,6 @@ import static java.util.Calendar.DAY_OF_MONTH;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.CommandType;
-import org.apache.dolphinscheduler.common.enums.Direct;
import org.apache.dolphinscheduler.common.process.Property;
import org.apache.dolphinscheduler.common.task.AbstractParameters;
import org.apache.dolphinscheduler.common.task.shell.ShellParameters;
@@ -42,10 +41,8 @@ import java.nio.file.StandardOpenOption;
import java.nio.file.attribute.FileAttribute;
import java.nio.file.attribute.PosixFilePermission;
import java.nio.file.attribute.PosixFilePermissions;
-import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
-import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -166,7 +163,7 @@ public class ShellTask extends AbstractTask {
private String parseScript(String script) {
// combining local and global parameters
- Map<String, Property> paramsMap =
ParamUtils.convert(taskExecutionContext,shellParameters);
+ Map<String, Property> paramsMap =
ParamUtils.convert(taskExecutionContext,getParameters());
// replace variable TIME with $[YYYYmmddd...] in shell file when
history run job and batch complement job
if (taskExecutionContext.getScheduleTime() != null) {
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/spark/SparkTask.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/spark/SparkTask.java
index a5a641c..6939439 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/spark/SparkTask.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/spark/SparkTask.java
@@ -17,7 +17,6 @@
package org.apache.dolphinscheduler.server.worker.task.spark;
-import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.common.enums.SparkVersion;
import org.apache.dolphinscheduler.common.process.Property;
import org.apache.dolphinscheduler.common.process.ResourceInfo;
@@ -109,13 +108,8 @@ public class SparkTask extends AbstractYarnTask {
// other parameters
args.addAll(SparkArgsUtils.buildArgs(sparkParameters));
- // replace placeholder
- Map<String, Property> paramsMap =
ParamUtils.convert(ParamUtils.getUserDefParamsMap(taskExecutionContext.getDefinedParams()),
- taskExecutionContext.getDefinedParams(),
- sparkParameters.getLocalParametersMap(),
- sparkParameters.getVarPoolMap(),
- CommandType.of(taskExecutionContext.getCmdTypeIfComplement()),
- taskExecutionContext.getScheduleTime());
+ // replace placeholder, and combining local and global parameters
+ Map<String, Property> paramsMap =
ParamUtils.convert(taskExecutionContext,getParameters());
String command = null;
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sql/SqlTask.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sql/SqlTask.java
index b174734..9dd8b51 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sql/SqlTask.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sql/SqlTask.java
@@ -20,7 +20,6 @@ package org.apache.dolphinscheduler.server.worker.task.sql;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.datasource.BaseConnectionParam;
import org.apache.dolphinscheduler.common.datasource.DatasourceUtil;
-import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.common.enums.DbType;
import org.apache.dolphinscheduler.common.enums.Direct;
import org.apache.dolphinscheduler.common.enums.TaskTimeoutStrategy;
@@ -166,14 +165,8 @@ public class SqlTask extends AbstractTask {
Map<Integer, Property> sqlParamsMap = new HashMap<>();
StringBuilder sqlBuilder = new StringBuilder();
- // find process instance by task id
-
- Map<String, Property> paramsMap =
ParamUtils.convert(ParamUtils.getUserDefParamsMap(taskExecutionContext.getDefinedParams()),
- taskExecutionContext.getDefinedParams(),
- sqlParameters.getLocalParametersMap(),
- sqlParameters.getVarPoolMap(),
- CommandType.of(taskExecutionContext.getCmdTypeIfComplement()),
- taskExecutionContext.getScheduleTime());
+ // combining local and global parameters
+ Map<String, Property> paramsMap =
ParamUtils.convert(taskExecutionContext,getParameters());
// spell SQL according to the final user-defined variable
if (paramsMap == null) {
@@ -276,7 +269,6 @@ public class SqlTask extends AbstractTask {
}
}
-
public String setNonQuerySqlReturn(String updateResult, List<Property>
properties) {
String result = null;
for (Property info :properties) {
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/SqoopTask.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/SqoopTask.java
index 1d1b32d..2f3e48d 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/SqoopTask.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/SqoopTask.java
@@ -17,7 +17,6 @@
package org.apache.dolphinscheduler.server.worker.task.sqoop;
-import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.common.process.Property;
import org.apache.dolphinscheduler.common.task.AbstractParameters;
import org.apache.dolphinscheduler.common.task.sqoop.SqoopParameters;
@@ -73,12 +72,8 @@ public class SqoopTask extends AbstractYarnTask {
SqoopJobGenerator generator = new SqoopJobGenerator();
String script = generator.generateSqoopJob(sqoopParameters,
sqoopTaskExecutionContext);
- Map<String, Property> paramsMap =
ParamUtils.convert(ParamUtils.getUserDefParamsMap(sqoopTaskExecutionContext.getDefinedParams()),
- sqoopTaskExecutionContext.getDefinedParams(),
- sqoopParameters.getLocalParametersMap(),
- sqoopParameters.getVarPoolMap(),
- CommandType.of(sqoopTaskExecutionContext.getCmdTypeIfComplement()),
- sqoopTaskExecutionContext.getScheduleTime());
+ // combining local and global parameters
+ Map<String, Property> paramsMap =
ParamUtils.convert(sqoopTaskExecutionContext,getParameters());
if (paramsMap != null) {
String resultScripts =
ParameterUtils.convertParameterPlaceholders(script,
ParamUtils.convert(paramsMap));
diff --git
a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/ParamsTest.java
b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/ParamsTest.java
index 12613c6..c3fa0fc 100644
---
a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/ParamsTest.java
+++
b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/ParamsTest.java
@@ -14,27 +14,21 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.dolphinscheduler.server.master;
import org.apache.dolphinscheduler.common.enums.CommandType;
-import org.apache.dolphinscheduler.common.enums.DataType;
-import org.apache.dolphinscheduler.common.enums.Direct;
-import org.apache.dolphinscheduler.common.process.Property;
-import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.ParameterUtils;
import org.apache.dolphinscheduler.common.utils.placeholder.BusinessTimeUtils;
-import org.apache.dolphinscheduler.server.utils.ParamUtils;
import java.util.Calendar;
import java.util.Date;
-import java.util.HashMap;
import java.util.Map;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
/**
* user define param
*/
@@ -42,9 +36,8 @@ public class ParamsTest {
private static final Logger logger =
LoggerFactory.getLogger(ParamsTest.class);
-
@Test
- public void systemParamsTest()throws Exception{
+ public void systemParamsTest() throws Exception {
String command = "${system.biz.date}";
// start process
@@ -56,12 +49,10 @@ public class ParamsTest {
logger.info("start process : {}",command);
-
Calendar calendar = Calendar.getInstance();
calendar.setTime(new Date());
calendar.add(Calendar.DAY_OF_MONTH, -5);
-
command = "${system.biz.date}";
// complement data
timeParams = BusinessTimeUtils
@@ -71,40 +62,4 @@ public class ParamsTest {
logger.info("complement data : {}",command);
}
-
- @Test
- public void convertTest() throws Exception {
- Map<String, Property> globalParams = new HashMap<>();
- Property property = new Property();
- property.setProp("global_param");
- property.setDirect(Direct.IN);
- property.setType(DataType.VARCHAR);
- property.setValue("${system.biz.date}");
- globalParams.put("global_param", property);
-
- Map<String, String> globalParamsMap = new HashMap<>();
- globalParamsMap.put("global_param", "${system.biz.date}");
-
- Map<String, Property> localParams = new HashMap<>();
- Property localProperty = new Property();
- localProperty.setProp("local_param");
- localProperty.setDirect(Direct.IN);
- localProperty.setType(DataType.VARCHAR);
- localProperty.setValue("${global_param}");
- localParams.put("local_param", localProperty);
-
- Map<String, Property> varPoolParams = new HashMap<>();
- Property varProperty = new Property();
- varProperty.setProp("local_param");
- varProperty.setDirect(Direct.IN);
- varProperty.setType(DataType.VARCHAR);
- varProperty.setValue("${global_param}");
- varPoolParams.put("varPool", varProperty);
-
- Map<String, Property> paramsMap = ParamUtils.convert(globalParams,
globalParamsMap,
- localParams,varPoolParams, CommandType.START_PROCESS, new
Date());
- logger.info(JSONUtils.toJsonString(paramsMap));
-
-
- }
}
\ No newline at end of file
diff --git
a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/ParamUtilsTest.java
b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/ParamUtilsTest.java
index 99a6eb2..4d7bc93 100644
---
a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/ParamUtilsTest.java
+++
b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/ParamUtilsTest.java
@@ -21,7 +21,6 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import org.apache.dolphinscheduler.common.Constants;
-import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.common.enums.DataType;
import org.apache.dolphinscheduler.common.enums.Direct;
import org.apache.dolphinscheduler.common.enums.TaskType;
@@ -85,7 +84,7 @@ public class ParamUtilsTest {
localParams.put("local_param", localProperty);
Property varProperty = new Property();
- varProperty.setProp("local_param");
+ varProperty.setProp("varPool");
varProperty.setDirect(Direct.IN);
varProperty.setType(DataType.VARCHAR);
varProperty.setValue("${global_param}");
@@ -93,42 +92,72 @@ public class ParamUtilsTest {
}
/**
- * Test convert
+ * This is basic test case for ParamUtils.convert.
+ * Warning:
+ * As you can see,this case invokes the function of convert in different
situations. When you first invoke the function of convert,
+ * the variables of localParams and varPool in the ShellParameters will
be modified. But in the whole system the variables of localParams
+ * and varPool have been used in other functions. I'm not sure if this
current situation is wrong. So I cannot modify the original logic.
*/
@Test
public void testConvert() {
//The expected value
- String expected =
"{\"varPool\":{\"prop\":\"local_param\",\"direct\":\"IN\",\"type\":\"VARCHAR\",\"value\":\"20191229\"},"
+ String expected =
"{\"varPool\":{\"prop\":\"varPool\",\"direct\":\"IN\",\"type\":\"VARCHAR\",\"value\":\"20191229\"},"
+
"\"global_param\":{\"prop\":\"global_param\",\"direct\":\"IN\",\"type\":\"VARCHAR\",\"value\":\"20191229\"},"
+
"\"local_param\":{\"prop\":\"local_param\",\"direct\":\"IN\",\"type\":\"VARCHAR\",\"value\":\"20191229\"}}";
+
//The expected value when globalParams is null but localParams is not
null
- String expected1 =
"{\"varPool\":{\"prop\":\"local_param\",\"direct\":\"IN\",\"type\":\"VARCHAR\",\"value\":\"20191229\"},"
- +
"\"global_param\":{\"prop\":\"global_param\",\"direct\":\"IN\",\"type\":\"VARCHAR\",\"value\":\"20191229\"},"
+ String expected1 =
"{\"varPool\":{\"prop\":\"varPool\",\"direct\":\"IN\",\"type\":\"VARCHAR\",\"value\":\"20191229\"},"
+
"\"local_param\":{\"prop\":\"local_param\",\"direct\":\"IN\",\"type\":\"VARCHAR\",\"value\":\"20191229\"}}";
//Define expected date , the month is 0-base
Calendar calendar = Calendar.getInstance();
calendar.set(2019, 11, 30);
Date date = calendar.getTime();
+ List<Property> globalParamList =
globalParams.values().stream().collect(Collectors.toList());
+ List<Property> localParamList =
localParams.values().stream().collect(Collectors.toList());
+ List<Property> varPoolParamList =
varPoolParams.values().stream().collect(Collectors.toList());
+
+ TaskExecutionContext taskExecutionContext = new TaskExecutionContext();
+ taskExecutionContext.setTaskInstanceId(1);
+ taskExecutionContext.setTaskName("params test");
+ taskExecutionContext.setTaskType(TaskType.SHELL.getDesc());
+ taskExecutionContext.setHost("127.0.0.1:1234");
+ taskExecutionContext.setExecutePath("/tmp/test");
+ taskExecutionContext.setLogPath("/log");
+ taskExecutionContext.setProcessInstanceId(1);
+ taskExecutionContext.setExecutorId(1);
+ taskExecutionContext.setCmdTypeIfComplement(0);
+ taskExecutionContext.setScheduleTime(date);
+
taskExecutionContext.setGlobalParams(JSONUtils.toJsonString(globalParamList));
+ taskExecutionContext.setDefinedParams(globalParamsMap);
+
taskExecutionContext.setVarPool("[{\"prop\":\"varPool\",\"direct\":\"IN\",\"type\":\"VARCHAR\",\"value\":\"${global_param}\"}]");
+ taskExecutionContext.setTaskParams(
+ "{\"rawScript\":\"#!/bin/sh\\necho $[yyyy-MM-dd
HH:mm:ss]\\necho \\\" ${task_execution_id} \\\"\\necho \\\"
${task_execution_path}\\\"\\n\","
+ + "\"localParams\":"
+ + "[],\"resourceList\":[]}");
+
+ ShellParameters shellParameters =
JSONUtils.parseObject(taskExecutionContext.getTaskParams(),
ShellParameters.class);
+ shellParameters.setLocalParams(localParamList);
+
+ String varPoolParamsJson =
JSONUtils.toJsonString(varPoolParams,SerializationFeature.ORDER_MAP_ENTRIES_BY_KEYS);
+ shellParameters.setVarPool(taskExecutionContext.getVarPool());
+ shellParameters.dealOutParam(varPoolParamsJson);
+
//Invoke convert
- Map<String, Property> paramsMap = ParamUtils.convert(globalParams,
globalParamsMap, localParams, varPoolParams,CommandType.START_PROCESS, date);
+ Map<String, Property> paramsMap =
ParamUtils.convert(taskExecutionContext, shellParameters);
String result = JSONUtils.toJsonString(paramsMap);
assertEquals(expected, result);
- for (Map.Entry<String, Property> entry : paramsMap.entrySet()) {
-
- String key = entry.getKey();
- Property prop = entry.getValue();
- logger.info(key + " : " + prop.getValue());
- }
-
//Invoke convert with null globalParams
- Map<String, Property> paramsMap1 = ParamUtils.convert(null,
globalParamsMap, localParams,varPoolParams, CommandType.START_PROCESS, date);
+ taskExecutionContext.setDefinedParams(null);
+ Map<String, Property> paramsMap1 =
ParamUtils.convert(taskExecutionContext, shellParameters);
+
String result1 = JSONUtils.toJsonString(paramsMap1);
assertEquals(expected1, result1);
- //Null check, invoke convert with null globalParams and null
localParams
- Map<String, Property> paramsMap2 = ParamUtils.convert(null,
globalParamsMap, null, varPoolParams,CommandType.START_PROCESS, date);
+ // Null check, invoke convert with null globalParams and null
localParams
+ shellParameters.setLocalParams(null);
+ Map<String, Property> paramsMap2 =
ParamUtils.convert(taskExecutionContext, shellParameters);
assertNull(paramsMap2);
}