This is an automated email from the ASF dual-hosted git repository.

kirs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new 83d5350  [Feature-#130] pass global param values when starting new 
process instance (#4372)
83d5350 is described below

commit 83d53505de6cbd13fb988559dc409091a8d69264
Author: Dean Wong <[email protected]>
AuthorDate: Tue Jan 12 09:13:27 2021 +0800

    [Feature-#130] pass global param values when starting new process instance 
(#4372)
    
    * [DS-130][feat] pass global param values when starting new process instance
        add optional param for start-process-instance api
        reuse command_param in command table for persistence
        overload curingGlobalParams function in ParameterUtils
        not adapt the UI code yet
    
    * change import order
    
    * support datetime expression
    
    * print start params
    
    * (fix) avoid npe when cmdParam is null
    
    Change-Id: I3b4c4b5fa1df316ff221e27146e45d7d4d3a404e
---
 .../api/controller/ExecutorController.java           | 16 +++++++++++-----
 .../api/service/ExecutorService.java                 | 13 ++++++++++---
 .../api/service/ExecutorService2Test.java            | 16 ++++++++--------
 .../apache/dolphinscheduler/common/Constants.java    |  2 ++
 .../common/utils/ParameterUtilsTest.java             | 20 ++++++++++++++++++++
 .../service/process/ProcessService.java              | 19 +++++++++++++++++++
 .../service/process/ProcessServiceTest.java          | 15 ++++++++++++++-
 7 files changed, 84 insertions(+), 17 deletions(-)

diff --git 
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ExecutorController.java
 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ExecutorController.java
index 20f4285..b093483 100644
--- 
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ExecutorController.java
+++ 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ExecutorController.java
@@ -22,6 +22,7 @@ import 
org.apache.dolphinscheduler.api.exceptions.ApiException;
 import org.apache.dolphinscheduler.api.service.ExecutorService;
 import org.apache.dolphinscheduler.api.utils.Result;
 import org.apache.dolphinscheduler.common.Constants;
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
 import org.apache.dolphinscheduler.dao.entity.User;
 import io.swagger.annotations.*;
 import org.apache.dolphinscheduler.common.enums.*;
@@ -107,21 +108,26 @@ public class ExecutorController extends BaseController {
                                        @RequestParam(value = "runMode", 
required = false) RunMode runMode,
                                        @RequestParam(value = 
"processInstancePriority", required = false) Priority processInstancePriority,
                                        @RequestParam(value = "workerGroup", 
required = false, defaultValue = "default") String workerGroup,
-                                       @RequestParam(value = "timeout", 
required = false) Integer timeout) throws ParseException {
+                                       @RequestParam(value = "timeout", 
required = false) Integer timeout,
+                                       @RequestParam(value = "startParams", 
required = false) String startParams) throws ParseException {
         logger.info("login user {}, start process instance, project name: {}, 
process definition id: {}, schedule time: {}, "
                         + "failure policy: {}, node name: {}, node dep: {}, 
notify type: {}, "
-                        + "notify group id: {},receivers:{},receiversCc:{}, 
run mode: {},process instance priority:{}, workerGroup: {}, timeout: {}",
+                        + "notify group id: {},receivers:{},receiversCc:{}, 
run mode: {},process instance priority:{}, workerGroup: {}, timeout: {}, "
+                        + "startParams: {}",
                 loginUser.getUserName(), projectName, processDefinitionId, 
scheduleTime,
                 failureStrategy, startNodeList, taskDependType, warningType, 
workerGroup, receivers, receiversCc, runMode, processInstancePriority,
-                workerGroup, timeout);
+                workerGroup, timeout, startParams);
 
         if (timeout == null) {
             timeout = Constants.MAX_TASK_TIMEOUT;
         }
-
+        Map<String, String> startParamMap = null;
+        if (startParams != null) {
+            startParamMap = JSONUtils.toMap(startParams);
+        }
         Map<String, Object> result = 
execService.execProcessInstance(loginUser, projectName, processDefinitionId, 
scheduleTime, execType, failureStrategy,
                 startNodeList, taskDependType, warningType,
-                warningGroupId, receivers, receiversCc, runMode, 
processInstancePriority, workerGroup, timeout);
+                warningGroupId, receivers, receiversCc, runMode, 
processInstancePriority, workerGroup, timeout, startParamMap);
         return returnDataList(result);
     }
 
diff --git 
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ExecutorService.java
 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ExecutorService.java
index 7a0fd0f..f53d902 100644
--- 
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ExecutorService.java
+++ 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ExecutorService.java
@@ -21,6 +21,7 @@ import static 
org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_D
 import static 
org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_START_DATE;
 import static 
org.apache.dolphinscheduler.common.Constants.CMD_PARAM_RECOVER_PROCESS_ID_STRING;
 import static 
org.apache.dolphinscheduler.common.Constants.CMD_PARAM_START_NODE_NAMES;
+import static 
org.apache.dolphinscheduler.common.Constants.CMD_PARAM_START_PARAMS;
 import static org.apache.dolphinscheduler.common.Constants.MAX_TASK_TIMEOUT;
 
 import org.apache.dolphinscheduler.api.enums.ExecuteType;
@@ -112,6 +113,7 @@ public class ExecutorService extends BaseService {
      * @param workerGroup worker group name
      * @param runMode run mode
      * @param timeout timeout
+     * @param startParams the global param values which pass to new process 
instance
      * @return execute process instance code
      * @throws ParseException Parse Exception
      */
@@ -120,7 +122,8 @@ public class ExecutorService extends BaseService {
                                                    FailureStrategy 
failureStrategy, String startNodeList,
                                                    TaskDependType 
taskDependType, WarningType warningType, int warningGroupId,
                                                    String receivers, String 
receiversCc, RunMode runMode,
-                                                   Priority 
processInstancePriority, String workerGroup, Integer timeout) throws 
ParseException {
+                                                   Priority 
processInstancePriority, String workerGroup, Integer timeout,
+                                                   Map<String, String> 
startParams) throws ParseException {
         Map<String, Object> result = new HashMap<>();
         // timeout is invalid
         if (timeout <= 0 || timeout > MAX_TASK_TIMEOUT) {
@@ -157,7 +160,7 @@ public class ExecutorService extends BaseService {
          */
         int create = this.createCommand(commandType, processDefinitionId,
                 taskDependType, failureStrategy, startNodeList, cronTime, 
warningType, loginUser.getId(),
-                warningGroupId, runMode, processInstancePriority, workerGroup);
+                warningGroupId, runMode, processInstancePriority, workerGroup, 
startParams);
         if (create > 0) {
             /**
              * according to the process definition ID updateProcessInstance 
and CC recipient
@@ -502,7 +505,8 @@ public class ExecutorService extends BaseService {
                               TaskDependType nodeDep, FailureStrategy 
failureStrategy,
                               String startNodeList, String schedule, 
WarningType warningType,
                               int executorId, int warningGroupId,
-                              RunMode runMode, Priority 
processInstancePriority, String workerGroup) throws ParseException {
+                              RunMode runMode, Priority 
processInstancePriority, String workerGroup,
+                              Map<String, String> startParams) throws 
ParseException {
 
         /**
          * instantiate command schedule instance
@@ -529,6 +533,9 @@ public class ExecutorService extends BaseService {
         if (warningType != null) {
             command.setWarningType(warningType);
         }
+        if (startParams != null && startParams.size() > 0) {
+            cmdParam.put(CMD_PARAM_START_PARAMS, 
JSONUtils.toJsonString(startParams));
+        }
         command.setCommandParam(JSONUtils.toJsonString(cmdParam));
         command.setExecutorId(executorId);
         command.setWarningGroupId(warningGroupId);
diff --git 
a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorService2Test.java
 
b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorService2Test.java
index 09b9f7d..93dc3bc 100644
--- 
a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorService2Test.java
+++ 
b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorService2Test.java
@@ -149,7 +149,7 @@ public class ExecutorService2Test {
                     null, null,
                     null, null, 0,
                     "", "", RunMode.RUN_MODE_SERIAL,
-                    Priority.LOW, Constants.DEFAULT_WORKER_GROUP, 110);
+                    Priority.LOW, Constants.DEFAULT_WORKER_GROUP, 110, null);
             Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS));
             verify(processService, times(1)).createCommand(any(Command.class));
         } catch (Exception e) {
@@ -169,7 +169,7 @@ public class ExecutorService2Test {
                     null, "n1,n2",
                     null, null, 0,
                     "", "", RunMode.RUN_MODE_SERIAL,
-                    Priority.LOW, Constants.DEFAULT_WORKER_GROUP, 110);
+                    Priority.LOW, Constants.DEFAULT_WORKER_GROUP, 110, null);
             Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS));
             verify(processService, times(1)).createCommand(any(Command.class));
         } catch (Exception e) {
@@ -190,7 +190,7 @@ public class ExecutorService2Test {
                     null, null,
                     null, null, 0,
                     "", "", RunMode.RUN_MODE_SERIAL,
-                    Priority.LOW, Constants.DEFAULT_WORKER_GROUP, 110);
+                    Priority.LOW, Constants.DEFAULT_WORKER_GROUP, 110, null);
             Assert.assertEquals(Status.START_PROCESS_INSTANCE_ERROR, 
result.get(Constants.STATUS));
             verify(processService, times(0)).createCommand(any(Command.class));
         } catch (Exception e) {
@@ -210,7 +210,7 @@ public class ExecutorService2Test {
                     null, null,
                     null, null, 0,
                     "", "", RunMode.RUN_MODE_SERIAL,
-                    Priority.LOW, Constants.DEFAULT_WORKER_GROUP, 110);
+                    Priority.LOW, Constants.DEFAULT_WORKER_GROUP, 110, null);
             Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS));
             verify(processService, times(1)).createCommand(any(Command.class));
         } catch (Exception e) {
@@ -230,7 +230,7 @@ public class ExecutorService2Test {
                     null, null,
                     null, null, 0,
                     "", "", RunMode.RUN_MODE_PARALLEL,
-                    Priority.LOW, Constants.DEFAULT_WORKER_GROUP, 110);
+                    Priority.LOW, Constants.DEFAULT_WORKER_GROUP, 110, null);
             Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS));
             verify(processService, 
times(31)).createCommand(any(Command.class));
         } catch (Exception e) {
@@ -250,7 +250,7 @@ public class ExecutorService2Test {
                     null, null,
                     null, null, 0,
                     "", "", RunMode.RUN_MODE_PARALLEL,
-                    Priority.LOW, Constants.DEFAULT_WORKER_GROUP, 110);
+                    Priority.LOW, Constants.DEFAULT_WORKER_GROUP, 110, null);
             Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS));
             verify(processService, 
times(15)).createCommand(any(Command.class));
         } catch (Exception e) {
@@ -260,14 +260,14 @@ public class ExecutorService2Test {
 
     @Test
     public void testNoMsterServers() throws ParseException {
-        Mockito.when(monitorService.getServerListFromZK(true)).thenReturn(new 
ArrayList<Server>());
+        Mockito.when(monitorService.getServerListFromZK(true)).thenReturn(new 
ArrayList<>());
 
         Map<String, Object> result = 
executorService.execProcessInstance(loginUser, projectName,
                 processDefinitionId, cronTime, CommandType.COMPLEMENT_DATA,
                 null, null,
                 null, null, 0,
                 "", "", RunMode.RUN_MODE_PARALLEL,
-                Priority.LOW, Constants.DEFAULT_WORKER_GROUP, 110);
+                Priority.LOW, Constants.DEFAULT_WORKER_GROUP, 110, null);
         Assert.assertEquals(result.get(Constants.STATUS), 
Status.MASTER_NOT_EXISTS);
 
     }
diff --git 
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
 
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
index cd6008d..17fa753 100644
--- 
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
+++ 
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
@@ -458,6 +458,8 @@ public final class Constants {
 
     public static final String CMD_PARAM_START_NODE_NAMES = 
"StartNodeNameList";
 
+    public static final String CMD_PARAM_START_PARAMS = "StartParams";
+
     /**
      * complement data start date
      */
diff --git 
a/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/ParameterUtilsTest.java
 
b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/ParameterUtilsTest.java
index 796a5fa..9ebfd69 100644
--- 
a/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/ParameterUtilsTest.java
+++ 
b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/ParameterUtilsTest.java
@@ -132,6 +132,26 @@ public class ParameterUtilsTest {
 
         String result5 = ParameterUtils.curingGlobalParams(globalParamMap, 
globalParamList, CommandType.START_CURRENT_TASK_PROCESS, scheduleTime);
         Assert.assertEquals(result5, JSONUtils.toJsonString(globalParamList));
+
+        Property testStartParamProperty = new Property("testStartParam", 
Direct.IN, DataType.VARCHAR, "");
+        globalParamList.add(testStartParamProperty);
+        Property testStartParam2Property = new Property("testStartParam2", 
Direct.IN, DataType.VARCHAR, "$[yyyy-MM-dd+1]");
+        globalParamList.add(testStartParam2Property);
+        globalParamMap.put("testStartParam", "");
+        globalParamMap.put("testStartParam2", "$[yyyy-MM-dd+1]");
+
+        Map<String, String> startParamMap = new HashMap<>(2);
+        startParamMap.put("testStartParam", "$[yyyyMMdd]");
+
+        for (Map.Entry<String, String> param : globalParamMap.entrySet()) {
+            String val = startParamMap.get(param.getKey());
+            if (val != null) {
+                param.setValue(val);
+            }
+        }
+
+        String result6 = ParameterUtils.curingGlobalParams(globalParamMap, 
globalParamList, CommandType.START_CURRENT_TASK_PROCESS, scheduleTime);
+        Assert.assertTrue(result6.contains("20191220"));
     }
 
     /**
diff --git 
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
 
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
index 1499004..d77eb79 100644
--- 
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
+++ 
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
@@ -558,6 +558,25 @@ public class ProcessService {
         processInstance.setCommandStartTime(command.getStartTime());
         processInstance.setLocations(processDefinition.getLocations());
         processInstance.setConnects(processDefinition.getConnects());
+
+        // get start params from command param
+        Map<String, String> startParamMap = null;
+        if (cmdParam != null && 
cmdParam.containsKey(Constants.CMD_PARAM_START_PARAMS)) {
+            String startParamJson = 
cmdParam.get(Constants.CMD_PARAM_START_PARAMS);
+            startParamMap = JSONUtils.toMap(startParamJson);
+        }
+
+        // set start param into global params
+        if (startParamMap != null && startParamMap.size() > 0
+                && processDefinition.getGlobalParamMap() != null) {
+            for (Map.Entry<String, String> param : 
processDefinition.getGlobalParamMap().entrySet()) {
+                String val = startParamMap.get(param.getKey());
+                if (val != null) {
+                    param.setValue(val);
+                }
+            }
+        }
+
         // curing global params
         processInstance.setGlobalParams(ParameterUtils.curingGlobalParams(
             processDefinition.getGlobalParamMap(),
diff --git 
a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java
 
b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java
index 4ac91f0..db83e71 100644
--- 
a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java
+++ 
b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java
@@ -18,6 +18,7 @@
 package org.apache.dolphinscheduler.service.process;
 
 import static 
org.apache.dolphinscheduler.common.Constants.CMD_PARAM_RECOVER_PROCESS_ID_STRING;
+import static 
org.apache.dolphinscheduler.common.Constants.CMD_PARAM_START_PARAMS;
 import static 
org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS_DEFINE_ID;
 
 import org.apache.dolphinscheduler.common.Constants;
@@ -234,13 +235,14 @@ public class ProcessServiceTest {
         processDefinition.setId(123);
         processDefinition.setName("test");
         processDefinition.setVersion(1);
-        
processDefinition.setProcessDefinitionJson("{\"globalParams\":[],\"tasks\":[{\"conditionResult\":"
+        
processDefinition.setProcessDefinitionJson("{\"globalParams\":[{\"prop\":\"startParam1\",\"direct\":\"IN\",\"type\":\"VARCHAR\",\"value\":\"\"}],\"tasks\":[{\"conditionResult\":"
                 + 
"{\"failedNode\":[\"\"],\"successNode\":[\"\"]},\"delayTime\":\"0\",\"dependence\":{}"
                 + 
",\"description\":\"\",\"id\":\"tasks-3011\",\"maxRetryTimes\":\"0\",\"name\":\"tsssss\""
                 + ",\"params\":{\"localParams\":[],\"rawScript\":\"echo 
\\\"123123\\\"\",\"resourceList\":[]}"
                 + 
",\"preTasks\":[],\"retryInterval\":\"1\",\"runFlag\":\"NORMAL\",\"taskInstancePriority\":\"MEDIUM\""
                 + 
",\"timeout\":{\"enable\":false,\"interval\":null,\"strategy\":\"\"},\"type\":\"SHELL\""
                 + 
",\"waitStartTimeout\":{},\"workerGroup\":\"default\"}],\"tenantId\":4,\"timeout\":0}");
+        
processDefinition.setGlobalParams("[{\"prop\":\"startParam1\",\"direct\":\"IN\",\"type\":\"VARCHAR\",\"value\":\"\"}]");
         ProcessInstance processInstance = new ProcessInstance();
         processInstance.setId(222);
         
Mockito.when(processDefineMapper.selectById(command1.getProcessDefinitionId())).thenReturn(processDefinition);
@@ -265,6 +267,17 @@ public class ProcessServiceTest {
         
command4.setCommandParam("{\"WaitingThreadInstanceId\":222,\"StartNodeIdList\":\"n1,n2\"}");
         command4.setCommandType(CommandType.REPEAT_RUNNING);
         Assert.assertNotNull(processService.handleCommand(logger, host, 
validThreadNum, command4));
+
+        Command command5 = new Command();
+        command5.setProcessDefinitionId(123);
+        HashMap<String, String> startParams = new HashMap<>();
+        startParams.put("startParam1", "testStartParam1");
+        HashMap<String, String> commandParams = new HashMap<>();
+        commandParams.put(CMD_PARAM_START_PARAMS, 
JSONUtils.toJsonString(startParams));
+        command5.setCommandParam(JSONUtils.toJsonString(commandParams));
+        command5.setCommandType(CommandType.START_PROCESS);
+        ProcessInstance processInstance1 = 
processService.handleCommand(logger, host, validThreadNum, command5);
+        
Assert.assertTrue(processInstance1.getGlobalParams().contains("\"testStartParam1\""));
     }
 
     @Test

Reply via email to