This is an automated email from the ASF dual-hosted git repository.

lidongdai pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new 00ff88e  support custom datax configuration  (#2224)
00ff88e is described below

commit 00ff88ef1c4a8af8d2dc1324543338629dd3ca6b
Author: Simon <[email protected]>
AuthorDate: Thu Mar 19 18:56:58 2020 +0800

    support custom datax configuration  (#2224)
    
    * fix #1441
    
    * support custom datax config
    
    * support datax custom config
    
    * support datax custom config
    
    * support datax custom config
    
    Co-authored-by: 张世鸣 <[email protected]>
---
 .../dolphinscheduler/api/utils/CheckUtilsTest.java |  1 +
 .../common/task/datax/DataxParameters.java         | 46 ++++++++++++++++----
 .../server/worker/task/datax/DataxTask.java        | 39 +++++++++++++----
 .../server/worker/task/datax/DataxTaskTest.java    | 49 ++++++++++++++++------
 4 files changed, 106 insertions(+), 29 deletions(-)

diff --git 
a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/utils/CheckUtilsTest.java
 
b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/utils/CheckUtilsTest.java
index 24a0ed3..308ed8e 100644
--- 
a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/utils/CheckUtilsTest.java
+++ 
b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/utils/CheckUtilsTest.java
@@ -211,6 +211,7 @@ public class CheckUtilsTest {
         // DataxParameters
         DataxParameters dataxParameters = new DataxParameters();
         
assertFalse(CheckUtils.checkTaskNodeParameters(JSONUtils.toJsonString(dataxParameters),
 TaskType.DATAX.toString()));
+        dataxParameters.setCustomConfig(0);
         dataxParameters.setDataSource(111);
         dataxParameters.setDataTarget(333);
         dataxParameters.setSql("sql");
diff --git 
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/datax/DataxParameters.java
 
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/datax/DataxParameters.java
index 95dd505..f153360 100755
--- 
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/datax/DataxParameters.java
+++ 
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/datax/DataxParameters.java
@@ -28,6 +28,16 @@ import 
org.apache.dolphinscheduler.common.task.AbstractParameters;
 public class DataxParameters extends AbstractParameters {
 
     /**
+     * if custom json config,eg  0, 1
+     */
+    private Integer customConfig;
+
+    /**
+     * if customConfig eq 1 ,then json is usable
+     */
+    private String json;
+
+    /**
      * data source type,eg  MYSQL, POSTGRES ...
      */
     private String dsType;
@@ -77,6 +87,22 @@ public class DataxParameters extends AbstractParameters {
      */
     private int jobSpeedRecord;
 
+    public Integer getCustomConfig() {
+        return customConfig;
+    }
+
+    public void setCustomConfig(Integer customConfig) {
+        this.customConfig = customConfig;
+    }
+
+    public String getJson() {
+        return json;
+    }
+
+    public void setJson(String json) {
+        this.json = json;
+    }
+
     public String getDsType() {
         return dsType;
     }
@@ -157,16 +183,18 @@ public class DataxParameters extends AbstractParameters {
         this.jobSpeedRecord = jobSpeedRecord;
     }
 
+
     @Override
     public boolean checkParameters() {
-        if (!(dataSource != 0
-                && dataTarget != 0
-                && StringUtils.isNotEmpty(sql)
-                && StringUtils.isNotEmpty(targetTable))) {
-            return false;
+        if (customConfig == null) return false;
+        if (customConfig == 0) {
+            return dataSource != 0
+                    && dataTarget != 0
+                    && StringUtils.isNotEmpty(sql)
+                    && StringUtils.isNotEmpty(targetTable);
+        } else {
+            return StringUtils.isNotEmpty(json);
         }
-
-        return true;
     }
 
     @Override
@@ -177,7 +205,9 @@ public class DataxParameters extends AbstractParameters {
     @Override
     public String toString() {
         return "DataxParameters{" +
-                "dsType='" + dsType + '\'' +
+                "customConfig=" + customConfig +
+                ", json='" + json + '\'' +
+                ", dsType='" + dsType + '\'' +
                 ", dataSource=" + dataSource +
                 ", dtType='" + dtType + '\'' +
                 ", dataTarget=" + dataTarget +
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/datax/DataxTask.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/datax/DataxTask.java
index 952030e..8083bb6 100755
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/datax/DataxTask.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/datax/DataxTask.java
@@ -192,24 +192,47 @@ public class DataxTask extends AbstractTask {
         throws Exception {
         // generate json
         String fileName = String.format("%s/%s_job.json", taskDir, 
taskProps.getTaskAppId());
+        String json;
 
         Path path = new File(fileName).toPath();
         if (Files.exists(path)) {
             return fileName;
         }
 
-        JSONObject job = new JSONObject();
-        job.put("content", buildDataxJobContentJson());
-        job.put("setting", buildDataxJobSettingJson());
 
-        JSONObject root = new JSONObject();
-        root.put("job", job);
-        root.put("core", buildDataxCoreJson());
 
-        logger.debug("datax job json : {}", root.toString());
+        if (dataXParameters.getCustomConfig() == 1){
+
+            json = dataXParameters.getJson().replaceAll("\\r\\n", "\n");
+
+            /**
+             *  combining local and global parameters
+             */
+            Map<String, Property> paramsMap = 
ParamUtils.convert(taskProps.getUserDefParamsMap(),
+                    taskProps.getDefinedParams(),
+                    dataXParameters.getLocalParametersMap(),
+                    taskProps.getCmdTypeIfComplement(),
+                    taskProps.getScheduleTime());
+            if (paramsMap != null){
+                json = ParameterUtils.convertParameterPlaceholders(json, 
ParamUtils.convert(paramsMap));
+            }
+
+        }else {
+
+            JSONObject job = new JSONObject();
+            job.put("content", buildDataxJobContentJson());
+            job.put("setting", buildDataxJobSettingJson());
+
+            JSONObject root = new JSONObject();
+            root.put("job", job);
+            root.put("core", buildDataxCoreJson());
+            json = root.toString();
+        }
+
+        logger.debug("datax job json : {}", json);
 
         // create datax json file
-        FileUtils.writeStringToFile(new File(fileName), root.toString(), 
StandardCharsets.UTF_8);
+        FileUtils.writeStringToFile(new File(fileName), json, 
StandardCharsets.UTF_8);
         return fileName;
     }
 
diff --git 
a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/datax/DataxTaskTest.java
 
b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/datax/DataxTaskTest.java
index bd7f275..c2dbd26 100644
--- 
a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/datax/DataxTaskTest.java
+++ 
b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/datax/DataxTaskTest.java
@@ -44,6 +44,8 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.context.ApplicationContext;
 
+import static 
org.apache.dolphinscheduler.common.enums.CommandType.START_PROCESS;
+
 /**
  * DataxTask Tester.
  */
@@ -59,6 +61,8 @@ public class DataxTaskTest {
 
     private ApplicationContext applicationContext;
 
+    private TaskProps props = new TaskProps();
+
     @Before
     public void before()
         throws Exception {
@@ -70,7 +74,6 @@ public class DataxTaskTest {
         springApplicationContext.setApplicationContext(applicationContext);
         
Mockito.when(applicationContext.getBean(ProcessService.class)).thenReturn(processService);
 
-        TaskProps props = new TaskProps();
         props.setTaskDir("/tmp");
         props.setTaskAppId(String.valueOf(System.currentTimeMillis()));
         props.setTaskInstId(1);
@@ -78,10 +81,8 @@ public class DataxTaskTest {
         props.setEnvFile(".dolphinscheduler_env.sh");
         props.setTaskStartTime(new Date());
         props.setTaskTimeout(0);
-        props.setTaskParams(
-            
"{\"targetTable\":\"test\",\"postStatements\":[],\"jobSpeedByte\":1024,\"jobSpeedRecord\":1000,\"dtType\":\"MYSQL\",\"datasource\":1,\"dsType\":\"MYSQL\",\"datatarget\":2,\"jobSpeedByte\":0,\"sql\":\"select
 1 as test from dual\",\"preStatements\":[\"delete from 
test\"],\"postStatements\":[\"delete from test\"]}");
-        dataxTask = PowerMockito.spy(new DataxTask(props, logger));
-        dataxTask.init();
+        props.setCmdTypeIfComplement(START_PROCESS);
+        setTaskParems(0);
 
         
Mockito.when(processService.findDataSourceById(1)).thenReturn(getDataSource());
         
Mockito.when(processService.findDataSourceById(2)).thenReturn(getDataSource());
@@ -91,6 +92,22 @@ public class DataxTaskTest {
         Mockito.when(shellCommandExecutor.run(fileName, 
processService)).thenReturn(0);
     }
 
+    private void setTaskParems(Integer customConfig) {
+        if (customConfig == 1) {
+            props.setTaskParams(
+                    "{\"customConfig\":1, 
\"localParams\":[{\"prop\":\"test\",\"value\":\"38294729\"}],\"json\":\"{\\\"job\\\":{\\\"setting\\\":{\\\"speed\\\":{\\\"byte\\\":1048576},\\\"errorLimit\\\":{\\\"record\\\":0,\\\"percentage\\\":0.02}},\\\"content\\\":[{\\\"reader\\\":{\\\"name\\\":\\\"rdbmsreader\\\",\\\"parameter\\\":{\\\"username\\\":\\\"xxx\\\",\\\"password\\\":\\\"${test}\\\",\\\"column\\\":[\\\"id\\\",\\\"name\\\"],\\\"splitPk\\\":\\\"pk\\\",\\\"connection\\\":[{\\\"queryS
 [...]
+
+//                    
"{\"customConfig\":1,\"json\":\"{\\\"job\\\":{\\\"setting\\\":{\\\"speed\\\":{\\\"byte\\\":1048576},\\\"errorLimit\\\":{\\\"record\\\":0,\\\"percentage\\\":0.02}},\\\"content\\\":[{\\\"reader\\\":{\\\"name\\\":\\\"rdbmsreader\\\",\\\"parameter\\\":{\\\"username\\\":\\\"xxx\\\",\\\"password\\\":\\\"xxx\\\",\\\"column\\\":[\\\"id\\\",\\\"name\\\"],\\\"splitPk\\\":\\\"pk\\\",\\\"connection\\\":[{\\\"querySql\\\":[\\\"SELECT
 * from dual\\\"],\\\"jdbcUrl\\\":[\\\"jdbc:dm [...]
+        } else {
+            props.setTaskParams(
+                    
"{\"customConfig\":0,\"targetTable\":\"test\",\"postStatements\":[],\"jobSpeedByte\":1024,\"jobSpeedRecord\":1000,\"dtType\":\"MYSQL\",\"datasource\":1,\"dsType\":\"MYSQL\",\"datatarget\":2,\"jobSpeedByte\":0,\"sql\":\"select
 1 as test from dual\",\"preStatements\":[\"delete from 
test\"],\"postStatements\":[\"delete from test\"]}");
+
+        }
+
+        dataxTask = PowerMockito.spy(new DataxTask(props, logger));
+        dataxTask.init();
+    }
+
     private DataSource getDataSource() {
         DataSource dataSource = new DataSource();
         dataSource.setType(DbType.MYSQL);
@@ -102,7 +119,7 @@ public class DataxTaskTest {
 
     private ProcessInstance getProcessInstance() {
         ProcessInstance processInstance = new ProcessInstance();
-        processInstance.setCommandType(CommandType.START_PROCESS);
+        processInstance.setCommandType(START_PROCESS);
         processInstance.setScheduleTime(new Date());
         return processInstance;
     }
@@ -229,18 +246,24 @@ public class DataxTaskTest {
      */
     @Test
     public void testBuildDataxJsonFile()
-        throws Exception {
+            throws Exception {
         try {
-            Method method = 
DataxTask.class.getDeclaredMethod("buildDataxJsonFile");
-            method.setAccessible(true);
-            String filePath = (String) method.invoke(dataxTask, null);
-            Assert.assertNotNull(filePath);
-        }
-        catch (Exception e) {
+            setTaskParems(1);
+            buildDataJson();
+            setTaskParems(0);
+            buildDataJson();
+        } catch (Exception e) {
             Assert.fail(e.getMessage());
         }
     }
 
+    public void buildDataJson() throws Exception {
+        Method method = 
DataxTask.class.getDeclaredMethod("buildDataxJsonFile");
+        method.setAccessible(true);
+        String filePath = (String) method.invoke(dataxTask, null);
+        Assert.assertNotNull(filePath);
+    }
+
     /**
      * Method: buildDataxJobContentJson()
      */

Reply via email to