This is an automated email from the ASF dual-hosted git repository.
lidongdai pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new 00ff88e support custom datax configuration (#2224)
00ff88e is described below
commit 00ff88ef1c4a8af8d2dc1324543338629dd3ca6b
Author: Simon <[email protected]>
AuthorDate: Thu Mar 19 18:56:58 2020 +0800
support custom datax configuration (#2224)
* fix #1441
* support custom datax config
* support datax custom config
* support datax custom config
* support datax custom config
Co-authored-by: 张世鸣 <[email protected]>
---
.../dolphinscheduler/api/utils/CheckUtilsTest.java | 1 +
.../common/task/datax/DataxParameters.java | 46 ++++++++++++++++----
.../server/worker/task/datax/DataxTask.java | 39 +++++++++++++----
.../server/worker/task/datax/DataxTaskTest.java | 49 ++++++++++++++++------
4 files changed, 106 insertions(+), 29 deletions(-)
diff --git
a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/utils/CheckUtilsTest.java
b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/utils/CheckUtilsTest.java
index 24a0ed3..308ed8e 100644
---
a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/utils/CheckUtilsTest.java
+++
b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/utils/CheckUtilsTest.java
@@ -211,6 +211,7 @@ public class CheckUtilsTest {
// DataxParameters
DataxParameters dataxParameters = new DataxParameters();
assertFalse(CheckUtils.checkTaskNodeParameters(JSONUtils.toJsonString(dataxParameters),
TaskType.DATAX.toString()));
+ dataxParameters.setCustomConfig(0);
dataxParameters.setDataSource(111);
dataxParameters.setDataTarget(333);
dataxParameters.setSql("sql");
diff --git
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/datax/DataxParameters.java
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/datax/DataxParameters.java
index 95dd505..f153360 100755
---
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/datax/DataxParameters.java
+++
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/datax/DataxParameters.java
@@ -28,6 +28,16 @@ import
org.apache.dolphinscheduler.common.task.AbstractParameters;
public class DataxParameters extends AbstractParameters {
/**
+ * if custom json config,eg 0, 1
+ */
+ private Integer customConfig;
+
+ /**
+ * if customConfig eq 1 ,then json is usable
+ */
+ private String json;
+
+ /**
* data source type,eg MYSQL, POSTGRES ...
*/
private String dsType;
@@ -77,6 +87,22 @@ public class DataxParameters extends AbstractParameters {
*/
private int jobSpeedRecord;
+ public Integer getCustomConfig() {
+ return customConfig;
+ }
+
+ public void setCustomConfig(Integer customConfig) {
+ this.customConfig = customConfig;
+ }
+
+ public String getJson() {
+ return json;
+ }
+
+ public void setJson(String json) {
+ this.json = json;
+ }
+
public String getDsType() {
return dsType;
}
@@ -157,16 +183,18 @@ public class DataxParameters extends AbstractParameters {
this.jobSpeedRecord = jobSpeedRecord;
}
+
@Override
public boolean checkParameters() {
- if (!(dataSource != 0
- && dataTarget != 0
- && StringUtils.isNotEmpty(sql)
- && StringUtils.isNotEmpty(targetTable))) {
- return false;
+ if (customConfig == null) return false;
+ if (customConfig == 0) {
+ return dataSource != 0
+ && dataTarget != 0
+ && StringUtils.isNotEmpty(sql)
+ && StringUtils.isNotEmpty(targetTable);
+ } else {
+ return StringUtils.isNotEmpty(json);
}
-
- return true;
}
@Override
@@ -177,7 +205,9 @@ public class DataxParameters extends AbstractParameters {
@Override
public String toString() {
return "DataxParameters{" +
- "dsType='" + dsType + '\'' +
+ "customConfig=" + customConfig +
+ ", json='" + json + '\'' +
+ ", dsType='" + dsType + '\'' +
", dataSource=" + dataSource +
", dtType='" + dtType + '\'' +
", dataTarget=" + dataTarget +
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/datax/DataxTask.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/datax/DataxTask.java
index 952030e..8083bb6 100755
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/datax/DataxTask.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/datax/DataxTask.java
@@ -192,24 +192,47 @@ public class DataxTask extends AbstractTask {
throws Exception {
// generate json
String fileName = String.format("%s/%s_job.json", taskDir,
taskProps.getTaskAppId());
+ String json;
Path path = new File(fileName).toPath();
if (Files.exists(path)) {
return fileName;
}
- JSONObject job = new JSONObject();
- job.put("content", buildDataxJobContentJson());
- job.put("setting", buildDataxJobSettingJson());
- JSONObject root = new JSONObject();
- root.put("job", job);
- root.put("core", buildDataxCoreJson());
- logger.debug("datax job json : {}", root.toString());
+ if (dataXParameters.getCustomConfig() == 1){
+
+ json = dataXParameters.getJson().replaceAll("\\r\\n", "\n");
+
+ /**
+ * combining local and global parameters
+ */
+ Map<String, Property> paramsMap =
ParamUtils.convert(taskProps.getUserDefParamsMap(),
+ taskProps.getDefinedParams(),
+ dataXParameters.getLocalParametersMap(),
+ taskProps.getCmdTypeIfComplement(),
+ taskProps.getScheduleTime());
+ if (paramsMap != null){
+ json = ParameterUtils.convertParameterPlaceholders(json,
ParamUtils.convert(paramsMap));
+ }
+
+ }else {
+
+ JSONObject job = new JSONObject();
+ job.put("content", buildDataxJobContentJson());
+ job.put("setting", buildDataxJobSettingJson());
+
+ JSONObject root = new JSONObject();
+ root.put("job", job);
+ root.put("core", buildDataxCoreJson());
+ json = root.toString();
+ }
+
+ logger.debug("datax job json : {}", json);
// create datax json file
- FileUtils.writeStringToFile(new File(fileName), root.toString(),
StandardCharsets.UTF_8);
+ FileUtils.writeStringToFile(new File(fileName), json,
StandardCharsets.UTF_8);
return fileName;
}
diff --git
a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/datax/DataxTaskTest.java
b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/datax/DataxTaskTest.java
index bd7f275..c2dbd26 100644
---
a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/datax/DataxTaskTest.java
+++
b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/datax/DataxTaskTest.java
@@ -44,6 +44,8 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.ApplicationContext;
+import static
org.apache.dolphinscheduler.common.enums.CommandType.START_PROCESS;
+
/**
* DataxTask Tester.
*/
@@ -59,6 +61,8 @@ public class DataxTaskTest {
private ApplicationContext applicationContext;
+ private TaskProps props = new TaskProps();
+
@Before
public void before()
throws Exception {
@@ -70,7 +74,6 @@ public class DataxTaskTest {
springApplicationContext.setApplicationContext(applicationContext);
Mockito.when(applicationContext.getBean(ProcessService.class)).thenReturn(processService);
- TaskProps props = new TaskProps();
props.setTaskDir("/tmp");
props.setTaskAppId(String.valueOf(System.currentTimeMillis()));
props.setTaskInstId(1);
@@ -78,10 +81,8 @@ public class DataxTaskTest {
props.setEnvFile(".dolphinscheduler_env.sh");
props.setTaskStartTime(new Date());
props.setTaskTimeout(0);
- props.setTaskParams(
-
"{\"targetTable\":\"test\",\"postStatements\":[],\"jobSpeedByte\":1024,\"jobSpeedRecord\":1000,\"dtType\":\"MYSQL\",\"datasource\":1,\"dsType\":\"MYSQL\",\"datatarget\":2,\"jobSpeedByte\":0,\"sql\":\"select
1 as test from dual\",\"preStatements\":[\"delete from
test\"],\"postStatements\":[\"delete from test\"]}");
- dataxTask = PowerMockito.spy(new DataxTask(props, logger));
- dataxTask.init();
+ props.setCmdTypeIfComplement(START_PROCESS);
+ setTaskParems(0);
Mockito.when(processService.findDataSourceById(1)).thenReturn(getDataSource());
Mockito.when(processService.findDataSourceById(2)).thenReturn(getDataSource());
@@ -91,6 +92,22 @@ public class DataxTaskTest {
Mockito.when(shellCommandExecutor.run(fileName,
processService)).thenReturn(0);
}
+ private void setTaskParems(Integer customConfig) {
+ if (customConfig == 1) {
+ props.setTaskParams(
+ "{\"customConfig\":1,
\"localParams\":[{\"prop\":\"test\",\"value\":\"38294729\"}],\"json\":\"{\\\"job\\\":{\\\"setting\\\":{\\\"speed\\\":{\\\"byte\\\":1048576},\\\"errorLimit\\\":{\\\"record\\\":0,\\\"percentage\\\":0.02}},\\\"content\\\":[{\\\"reader\\\":{\\\"name\\\":\\\"rdbmsreader\\\",\\\"parameter\\\":{\\\"username\\\":\\\"xxx\\\",\\\"password\\\":\\\"${test}\\\",\\\"column\\\":[\\\"id\\\",\\\"name\\\"],\\\"splitPk\\\":\\\"pk\\\",\\\"connection\\\":[{\\\"queryS
[...]
+
+//
"{\"customConfig\":1,\"json\":\"{\\\"job\\\":{\\\"setting\\\":{\\\"speed\\\":{\\\"byte\\\":1048576},\\\"errorLimit\\\":{\\\"record\\\":0,\\\"percentage\\\":0.02}},\\\"content\\\":[{\\\"reader\\\":{\\\"name\\\":\\\"rdbmsreader\\\",\\\"parameter\\\":{\\\"username\\\":\\\"xxx\\\",\\\"password\\\":\\\"xxx\\\",\\\"column\\\":[\\\"id\\\",\\\"name\\\"],\\\"splitPk\\\":\\\"pk\\\",\\\"connection\\\":[{\\\"querySql\\\":[\\\"SELECT
* from dual\\\"],\\\"jdbcUrl\\\":[\\\"jdbc:dm [...]
+ } else {
+ props.setTaskParams(
+
"{\"customConfig\":0,\"targetTable\":\"test\",\"postStatements\":[],\"jobSpeedByte\":1024,\"jobSpeedRecord\":1000,\"dtType\":\"MYSQL\",\"datasource\":1,\"dsType\":\"MYSQL\",\"datatarget\":2,\"jobSpeedByte\":0,\"sql\":\"select
1 as test from dual\",\"preStatements\":[\"delete from
test\"],\"postStatements\":[\"delete from test\"]}");
+
+ }
+
+ dataxTask = PowerMockito.spy(new DataxTask(props, logger));
+ dataxTask.init();
+ }
+
private DataSource getDataSource() {
DataSource dataSource = new DataSource();
dataSource.setType(DbType.MYSQL);
@@ -102,7 +119,7 @@ public class DataxTaskTest {
private ProcessInstance getProcessInstance() {
ProcessInstance processInstance = new ProcessInstance();
- processInstance.setCommandType(CommandType.START_PROCESS);
+ processInstance.setCommandType(START_PROCESS);
processInstance.setScheduleTime(new Date());
return processInstance;
}
@@ -229,18 +246,24 @@ public class DataxTaskTest {
*/
@Test
public void testBuildDataxJsonFile()
- throws Exception {
+ throws Exception {
try {
- Method method =
DataxTask.class.getDeclaredMethod("buildDataxJsonFile");
- method.setAccessible(true);
- String filePath = (String) method.invoke(dataxTask, null);
- Assert.assertNotNull(filePath);
- }
- catch (Exception e) {
+ setTaskParems(1);
+ buildDataJson();
+ setTaskParems(0);
+ buildDataJson();
+ } catch (Exception e) {
Assert.fail(e.getMessage());
}
}
+ public void buildDataJson() throws Exception {
+ Method method =
DataxTask.class.getDeclaredMethod("buildDataxJsonFile");
+ method.setAccessible(true);
+ String filePath = (String) method.invoke(dataxTask, null);
+ Assert.assertNotNull(filePath);
+ }
+
/**
* Method: buildDataxJobContentJson()
*/