This is an automated email from the ASF dual-hosted git repository.
gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new 3b58145 [Feature]Add Python task "task variable / result transfer"
implementation (#3659)
3b58145 is described below
commit 3b581455fc572c01c93a9d5ecfb955a966a20263
Author: guyinyou <[email protected]>
AuthorDate: Fri Sep 25 13:20:42 2020 +0800
[Feature]Add Python task "task variable / result transfer" implementation
(#3659)
* 增加Python Task的“任务变量/结果传递”实现
Signed-off-by: 古崟佑
* add two files license
Signed-off-by: 古崟佑
* fix 'server/worker/task/AbstractCommandExecutor.java' code style
Signed-off-by: 1941815847Cy4 <[email protected]>
* update DB
Signed-off-by: 古崟佑
* update DB -- 2
Signed-off-by: 古崟佑
* fix codeStyle
Signed-off-by: 古崟佑
* fix codestyle
Signed-off-by: 古崟佑
* fix codeStyle
Signed-off-by: 古崟佑
* fix codeStyle
Signed-off-by: 古崟佑
* fix codeStyle
Signed-off-by: 古崟佑
* add VarPoolUtils Test
Signed-off-by: 古崟佑
* fix VarPoolUtilsTest codeStyle
Signed-off-by: 古崟佑
* fix VarPoolUtilsTest codeStyle
Signed-off-by: 古崟佑
* fix VarPoolUtilsTest codeStyle
Signed-off-by: 古崟佑
* fix VarPoolUtilsTest codeStyle
Signed-off-by: 古崟佑
* fix VarPoolUtilsTest codeStyle
Signed-off-by: 古崟佑
* add test config for VarPoolUtilsTest
Signed-off-by: 古崟佑
* fix unit test
Signed-off-by: 古崟佑
* fix codeStyle
Signed-off-by: 古崟佑
* fix VarPoolUtilsTest.java
Signed-off-by: 古崟佑
* fix
Signed-off-by: 古崟佑
* change the test class path
Signed-off-by: 古崟佑
* fix
Signed-off-by: 古崟佑
* fix "print the error message"
Signed-off-by: 古崟佑
* fix bug
Signed-off-by: 古崟佑
* fix
Signed-off-by: 古崟佑
Co-authored-by: 1941815847Cy4 <[email protected]>
---
.../dolphinscheduler/common/task/TaskParams.java | 78 +++++++++
.../common/utils/VarPoolUtils.java | 124 ++++++++++++++
.../common/utils/VarPoolUtilsTest.java | 73 ++++++++
.../dao/entity/ProcessInstance.java | 13 ++
.../dolphinscheduler/dao/entity/TaskInstance.java | 12 ++
.../remote/command/TaskExecuteResponseCommand.java | 12 ++
.../master/processor/TaskResponseProcessor.java | 3 +-
.../master/processor/queue/TaskResponseEvent.java | 18 +-
.../processor/queue/TaskResponseService.java | 3 +-
.../server/master/runner/MasterExecThread.java | 15 +-
.../server/worker/runner/TaskExecuteThread.java | 1 +
.../worker/task/AbstractCommandExecutor.java | 15 +-
.../server/worker/task/AbstractTask.java | 13 ++
.../server/worker/task/python/PythonTask.java | 189 +++++++++++----------
.../service/process/ProcessService.java | 5 +-
pom.xml | 1 +
sql/dolphinscheduler-postgre.sql | 2 +
sql/dolphinscheduler_mysql.sql | 2 +
.../1.3.3_schema/mysql/dolphinscheduler_ddl.sql | 40 +++++
.../postgresql/dolphinscheduler_ddl.sql | 36 ++++
20 files changed, 555 insertions(+), 100 deletions(-)
diff --git
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/TaskParams.java
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/TaskParams.java
new file mode 100644
index 0000000..abea2d9
--- /dev/null
+++
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/TaskParams.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.common.task;
+
+import java.util.Map;
+
+public class TaskParams {
+
+ private String rawScript;
+ private Map<String, String>[] localParams;
+
+ public void setRawScript(String rawScript) {
+ this.rawScript = rawScript;
+ }
+
+ public void setLocalParams(Map<String, String>[] localParams) {
+ this.localParams = localParams;
+ }
+
+ public String getRawScript() {
+ return rawScript;
+ }
+
+ public void setLocalParamValue(String prop, Object value) {
+ if (localParams == null || value == null) {
+ return;
+ }
+ for (int i = 0; i < localParams.length; i++) {
+ if (localParams[i].get("prop").equals(prop)) {
+ localParams[i].put("value", (String)value);
+ }
+ }
+ }
+
+ public void setLocalParamValue(Map<String, Object> propToValue) {
+ if (localParams == null || propToValue == null) {
+ return;
+ }
+ for (int i = 0; i < localParams.length; i++) {
+ String prop = localParams[i].get("prop");
+ if (propToValue.containsKey(prop)) {
+ localParams[i].put("value",(String)propToValue.get(prop));
+ }
+ }
+ }
+
+ public String getLocalParamValue(String prop) {
+ if (localParams == null) {
+ return null;
+ }
+ for (int i = 0; i < localParams.length; i++) {
+ String tmpProp = localParams[i].get("prop");
+ if (tmpProp.equals(prop)) {
+ return localParams[i].get("value");
+ }
+ }
+ return null;
+ }
+
+ public Map<String, String>[] getLocalParams() {
+ return localParams;
+ }
+}
\ No newline at end of file
diff --git
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/VarPoolUtils.java
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/VarPoolUtils.java
new file mode 100644
index 0000000..837e96f
--- /dev/null
+++
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/VarPoolUtils.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.common.utils;
+
+import org.apache.dolphinscheduler.common.model.TaskNode;
+import org.apache.dolphinscheduler.common.task.TaskParams;
+
+import java.text.ParseException;
+import java.util.Map;
+
+public class VarPoolUtils {
+ /**
+ * getTaskNodeLocalParam
+ * @param taskNode taskNode
+ * @param prop prop
+ * @return localParamForProp
+ */
+ public static Object getTaskNodeLocalParam(TaskNode taskNode, String prop)
{
+ String taskParamsJson = taskNode.getParams();
+ TaskParams taskParams = JSONUtils.parseObject(taskParamsJson,
TaskParams.class);
+ if (taskParams == null) {
+ return null;
+ }
+ return taskParams.getLocalParamValue(prop);
+ }
+
+ /**
+ * setTaskNodeLocalParams
+ * @param taskNode taskNode
+ * @param prop LocalParamName
+ * @param value LocalParamValue
+ */
+ public static void setTaskNodeLocalParams(TaskNode taskNode, String prop,
Object value) {
+ String taskParamsJson = taskNode.getParams();
+ TaskParams taskParams = JSONUtils.parseObject(taskParamsJson,
TaskParams.class);
+ if (taskParams == null) {
+ return;
+ }
+ taskParams.setLocalParamValue(prop, value);
+ taskNode.setParams(JSONUtils.toJsonString(taskParams));
+ }
+
+ /**
+ * setTaskNodeLocalParams
+ * @param taskNode taskNode
+ * @param propToValue propToValue
+ */
+ public static void setTaskNodeLocalParams(TaskNode taskNode, Map<String,
Object> propToValue) {
+ String taskParamsJson = taskNode.getParams();
+ TaskParams taskParams = JSONUtils.parseObject(taskParamsJson,
TaskParams.class);
+ if (taskParams == null) {
+ return;
+ }
+ taskParams.setLocalParamValue(propToValue);
+ taskNode.setParams(JSONUtils.toJsonString(taskParams));
+ }
+
+ /**
+ * convertVarPoolToMap
+ * @param propToValue propToValue
+ * @param varPool varPool
+ * @throws ParseException ParseException
+ */
+ public static void convertVarPoolToMap(Map<String, Object> propToValue,
String varPool) throws ParseException {
+ if (varPool == null || propToValue == null) {
+ return;
+ }
+ String[] splits = varPool.split("\\$VarPool\\$");
+ for (String kv : splits) {
+ String[] kvs = kv.split(",");
+ if (kvs.length == 2) {
+ propToValue.put(kvs[0], kvs[1]);
+ } else {
+ throw new ParseException(kv, 2);
+ }
+ }
+ }
+
+ /**
+ * convertPythonScriptPlaceholders
+ * @param rawScript rawScript
+ * @return String
+ * @throws StringIndexOutOfBoundsException StringIndexOutOfBoundsException
+ */
+ public static String convertPythonScriptPlaceholders(String rawScript)
throws StringIndexOutOfBoundsException {
+ int len = "${setShareVar(${".length();
+ int scriptStart = 0;
+ while ((scriptStart = rawScript.indexOf("${setShareVar(${",
scriptStart)) != -1) {
+ int start = -1;
+ int end = rawScript.indexOf('}', scriptStart + len);
+ String prop = rawScript.substring(scriptStart + len, end);
+
+ start = rawScript.indexOf(',', end);
+ end = rawScript.indexOf(')', start);
+
+ String value = rawScript.substring(start + 1, end);
+
+ start = rawScript.indexOf('}', start) + 1;
+ end = rawScript.length();
+
+ String replaceScript =
String.format("print(\"${{setValue({},{})}}\".format(\"%s\",%s))", prop, value);
+
+ rawScript = rawScript.substring(0, scriptStart) + replaceScript +
rawScript.substring(start, end);
+
+ scriptStart += replaceScript.length();
+ }
+ return rawScript;
+ }
+}
\ No newline at end of file
diff --git
a/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/VarPoolUtilsTest.java
b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/VarPoolUtilsTest.java
new file mode 100644
index 0000000..e47203c
--- /dev/null
+++
b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/VarPoolUtilsTest.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.common.utils;
+
+import org.apache.dolphinscheduler.common.model.TaskNode;
+
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class VarPoolUtilsTest {
+
+ private static final Logger logger =
LoggerFactory.getLogger(VarPoolUtilsTest.class);
+
+ @Test
+ public void testSetTaskNodeLocalParams() {
+ String taskJson =
"{\"conditionResult\":\"{\\\"successNode\\\":[\\\"\\\"],\\\"failedNode\\\":[\\\"\\\"]}\","
+ +
"\"conditionsTask\":false,\"depList\":[],\"dependence\":\"{}\",\"forbidden\":false,\"id\":\"tasks-75298\",\"maxRetryTimes\":0,\"name\":\"a1\","
+ + "\"params\":\"{\\\"rawScript\\\":\\\"print(\\\\\\\"this is
python task \\\\\\\",${p0})\\\","
+ +
"\\\"localParams\\\":[{\\\"prop\\\":\\\"p1\\\",\\\"direct\\\":\\\"IN\\\",\\\"type\\\":\\\"VARCHAR\\\",\\\"value\\\":\\\"1\\\"}],"
+ +
"\\\"resourceList\\\":[]}\",\"preTasks\":\"[]\",\"retryInterval\":1,\"runFlag\":\"NORMAL\",\"taskInstancePriority\":\"MEDIUM\","
+ +
"\"taskTimeoutParameter\":{\"enable\":false,\"interval\":0},\"timeout\":\"{\\\"enable\\\":false,\\\"strategy\\\":\\\"\\\"}\","
+ + "\"type\":\"PYTHON\",\"workerGroup\":\"default\"}";
+ TaskNode taskNode = JSONUtils.parseObject(taskJson, TaskNode.class);
+
+ VarPoolUtils.setTaskNodeLocalParams(taskNode, "p1", "test1");
+ Assert.assertEquals(VarPoolUtils.getTaskNodeLocalParam(taskNode,
"p1"), "test1");
+
+ ConcurrentHashMap<String, Object> propToValue = new
ConcurrentHashMap<String, Object>();
+ propToValue.put("p1", "test2");
+
+ VarPoolUtils.setTaskNodeLocalParams(taskNode, propToValue);
+ Assert.assertEquals(VarPoolUtils.getTaskNodeLocalParam(taskNode,
"p1"), "test2");
+ }
+
+ @Test
+ public void testConvertVarPoolToMap() throws Exception {
+ String varPool = "p1,66$VarPool$p2,69$VarPool$";
+ ConcurrentHashMap<String, Object> propToValue = new
ConcurrentHashMap<String, Object>();
+ VarPoolUtils.convertVarPoolToMap(propToValue, varPool);
+ Assert.assertEquals((String)propToValue.get("p1"), "66");
+ Assert.assertEquals((String)propToValue.get("p2"), "69");
+ logger.info(propToValue.toString());
+ }
+
+ @Test
+ public void testConvertPythonScriptPlaceholders() throws Exception {
+ String rawScript =
"print(${p1});\n${setShareVar(${p1},3)};\n${setShareVar(${p2},4)};";
+ rawScript = VarPoolUtils.convertPythonScriptPlaceholders(rawScript);
+ Assert.assertEquals(rawScript, "print(${p1});\n"
+ + "print(\"${{setValue({},{})}}\".format(\"p1\",3));\n"
+ + "print(\"${{setValue({},{})}}\".format(\"p2\",4));");
+ logger.info(rawScript);
+ }
+}
diff --git
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessInstance.java
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessInstance.java
index 3d1a756..e3a3f11 100644
---
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessInstance.java
+++
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessInstance.java
@@ -225,6 +225,11 @@ public class ProcessInstance {
private int tenantId;
/**
+ * varPool string
+ */
+ private String varPool;
+
+ /**
* receivers for api
*/
@TableField(exist = false)
@@ -256,6 +261,14 @@ public class ProcessInstance {
DateUtils.getCurrentTimeStamp();
}
+ public String getVarPool() {
+ return varPool;
+ }
+
+ public void setVarPool(String varPool) {
+ this.varPool = varPool;
+ }
+
public ProcessDefinition getProcessDefinition() {
return processDefinition;
}
diff --git
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java
index 9688200..b13ca87 100644
---
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java
+++
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java
@@ -212,6 +212,11 @@ public class TaskInstance implements Serializable {
private int executorId;
/**
+ * varPool string
+ */
+ private String varPool;
+
+ /**
* executor name
*/
@TableField(exist = false)
@@ -232,7 +237,14 @@ public class TaskInstance implements Serializable {
this.executePath = executePath;
}
+ public String getVarPool() {
+ return varPool;
+ }
+ public void setVarPool(String varPool) {
+ this.varPool = varPool;
+ }
+
public ProcessInstance getProcessInstance() {
return processInstance;
}
diff --git
a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskExecuteResponseCommand.java
b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskExecuteResponseCommand.java
index e559334..7f6ee66 100644
---
a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskExecuteResponseCommand.java
+++
b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskExecuteResponseCommand.java
@@ -63,7 +63,19 @@ public class TaskExecuteResponseCommand implements
Serializable {
*/
private String appIds;
+ /**
+ * varPool string
+ */
+ private String varPool;
+ public void setVarPool(String varPool) {
+ this.varPool = varPool;
+ }
+
+ public String getVarPool() {
+ return varPool;
+ }
+
public int getTaskInstanceId() {
return taskInstanceId;
}
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java
index b04b930..2633ccd 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java
@@ -90,7 +90,8 @@ public class TaskResponseProcessor implements
NettyRequestProcessor {
responseCommand.getEndTime(),
responseCommand.getProcessId(),
responseCommand.getAppIds(),
- responseCommand.getTaskInstanceId());
+ responseCommand.getTaskInstanceId(),
+ responseCommand.getVarPool());
taskResponseService.addResponse(taskResponseEvent);
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseEvent.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseEvent.java
index 051cc38..ba07be5 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseEvent.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseEvent.java
@@ -79,7 +79,12 @@ public class TaskResponseEvent {
*/
private Event event;
- public static TaskResponseEvent newAck(ExecutionStatus state, Date
startTime, String workerAddress, String executePath, String logPath, int
taskInstanceId){
+ /**
+ * varPool
+ */
+ private String varPool;
+
+ public static TaskResponseEvent newAck(ExecutionStatus state, Date
startTime, String workerAddress, String executePath, String logPath, int
taskInstanceId) {
TaskResponseEvent event = new TaskResponseEvent();
event.setState(state);
event.setStartTime(startTime);
@@ -91,7 +96,7 @@ public class TaskResponseEvent {
return event;
}
- public static TaskResponseEvent newResult(ExecutionStatus state, Date
endTime, int processId, String appIds, int taskInstanceId){
+ public static TaskResponseEvent newResult(ExecutionStatus state, Date
endTime, int processId, String appIds, int taskInstanceId, String varPool) {
TaskResponseEvent event = new TaskResponseEvent();
event.setState(state);
event.setEndTime(endTime);
@@ -99,9 +104,18 @@ public class TaskResponseEvent {
event.setAppIds(appIds);
event.setTaskInstanceId(taskInstanceId);
event.setEvent(Event.RESULT);
+ event.setVarPool(varPool);
return event;
}
+ public String getVarPool() {
+ return varPool;
+ }
+
+ public void setVarPool(String varPool) {
+ this.varPool = varPool;
+ }
+
public int getTaskInstanceId() {
return taskInstanceId;
}
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseService.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseService.java
index ba07313..6434db7 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseService.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseService.java
@@ -139,7 +139,8 @@ public class TaskResponseService {
taskResponseEvent.getEndTime(),
taskResponseEvent.getProcessId(),
taskResponseEvent.getAppIds(),
- taskResponseEvent.getTaskInstanceId());
+ taskResponseEvent.getTaskInstanceId(),
+ taskResponseEvent.getVarPool());
break;
default:
throw new IllegalArgumentException("invalid event type : " +
event);
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java
index 788b306..3c28e16 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java
@@ -45,6 +45,7 @@ import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.apache.dolphinscheduler.common.utils.ParameterUtils;
import org.apache.dolphinscheduler.common.utils.StringUtils;
+import org.apache.dolphinscheduler.common.utils.VarPoolUtils;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.Schedule;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
@@ -59,6 +60,7 @@ import org.apache.commons.io.FileUtils;
import java.io.File;
import java.io.IOException;
+import java.text.ParseException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
@@ -651,14 +653,23 @@ public class MasterExecThread implements Runnable {
* submit post node
* @param parentNodeName parent node name
*/
+ private Map<String,Object> propToValue = new ConcurrentHashMap<String,
Object>();
private void submitPostNode(String parentNodeName){
List<String> submitTaskNodeList = parsePostNodeList(parentNodeName);
List<TaskInstance> taskInstances = new ArrayList<>();
for(String taskNode : submitTaskNodeList){
+ try {
+ VarPoolUtils.convertVarPoolToMap(propToValue,
processInstance.getVarPool());
+ } catch (ParseException e) {
+ logger.error("parse {} exception",
processInstance.getVarPool(), e);
+ throw new RuntimeException();
+ }
+ TaskNode taskNodeObject = dag.getNode(taskNode);
+ VarPoolUtils.setTaskNodeLocalParams(taskNodeObject, propToValue);
taskInstances.add(createTaskInstance(processInstance, taskNode,
- dag.getNode(taskNode)));
+ taskNodeObject));
}
// if previous node success , post node submit
@@ -999,6 +1010,8 @@ public class MasterExecThread implements Runnable {
task.getName(), task.getId(), task.getState());
// node success , post node submit
if(task.getState() == ExecutionStatus.SUCCESS){
+ processInstance.setVarPool(task.getVarPool());
+ processService.updateProcessInstance(processInstance);
completeTaskList.put(task.getName(), task);
submitPostNode(task.getName());
continue;
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java
index 3ba4945..58f7433 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java
@@ -155,6 +155,7 @@ public class TaskExecuteThread implements Runnable {
responseCommand.setEndTime(new Date());
responseCommand.setProcessId(task.getProcessId());
responseCommand.setAppIds(task.getAppIds());
+ responseCommand.setVarPool(task.getVarPool());
logger.info("task instance id : {},task final status : {}",
taskExecutionContext.getTaskInstanceId(), task.getExitStatus());
} catch (Exception e) {
logger.error("task scheduler failure", e);
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractCommandExecutor.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractCommandExecutor.java
index 3dedece..dddd1a6 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractCommandExecutor.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractCommandExecutor.java
@@ -66,6 +66,7 @@ public abstract class AbstractCommandExecutor {
*/
protected static final Pattern APPLICATION_REGEX =
Pattern.compile(Constants.APPLICATION_REGEX);
+ protected StringBuilder varPool = new StringBuilder();
/**
* process
*/
@@ -234,7 +235,10 @@ public abstract class AbstractCommandExecutor {
return result;
}
-
+ public String getVarPool() {
+ return varPool.toString();
+ }
+
/**
* cancel application
* @throws Exception exception
@@ -347,8 +351,13 @@ public abstract class AbstractCommandExecutor {
long lastFlushTime = System.currentTimeMillis();
while ((line = inReader.readLine()) != null) {
- logBuffer.add(line);
- lastFlushTime = flush(lastFlushTime);
+ if (line.startsWith("${setValue(")) {
+
varPool.append(line.substring("${setValue(".length(), line.length() - 2));
+ varPool.append("$VarPool$");
+ } else {
+ logBuffer.add(line);
+ lastFlushTime = flush(lastFlushTime);
+ }
}
} catch (Exception e) {
logger.error(e.getMessage(),e);
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractTask.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractTask.java
index ae03932..1a66349 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractTask.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractTask.java
@@ -48,6 +48,11 @@ import java.util.Map;
public abstract class AbstractTask {
/**
+ * varPool string
+ */
+ protected String varPool;
+
+ /**
* taskExecutionContext
**/
TaskExecutionContext taskExecutionContext;
@@ -121,6 +126,14 @@ public abstract class AbstractTask {
logger.info(" -> {}", String.join("\n\t", logs));
}
+ public void setVarPool(String varPool) {
+ this.varPool = varPool;
+ }
+
+ public String getVarPool() {
+ return varPool;
+ }
+
/**
* get exit status code
* @return exit status code
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/python/PythonTask.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/python/PythonTask.java
index 367da80..6e561c1 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/python/PythonTask.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/python/PythonTask.java
@@ -38,103 +38,110 @@ import java.util.Map;
*/
public class PythonTask extends AbstractTask {
- /**
- * python parameters
- */
- private PythonParameters pythonParameters;
-
- /**
- * task dir
- */
- private String taskDir;
-
- /**
- * python command executor
- */
- private PythonCommandExecutor pythonCommandExecutor;
-
- /**
- * taskExecutionContext
- */
- private TaskExecutionContext taskExecutionContext;
-
- /**
- * constructor
- * @param taskExecutionContext taskExecutionContext
- * @param logger logger
- */
- public PythonTask(TaskExecutionContext taskExecutionContext, Logger logger) {
- super(taskExecutionContext, logger);
- this.taskExecutionContext = taskExecutionContext;
-
- this.pythonCommandExecutor = new PythonCommandExecutor(this::logHandle,
- taskExecutionContext,
- logger);
- }
-
- @Override
- public void init() {
- logger.info("python task params {}", taskExecutionContext.getTaskParams());
-
- pythonParameters =
JSONUtils.parseObject(taskExecutionContext.getTaskParams(),
PythonParameters.class);
-
- if (!pythonParameters.checkParameters()) {
- throw new RuntimeException("python task params is not valid");
+ /**
+ * python parameters
+ */
+ private PythonParameters pythonParameters;
+
+ /**
+ * task dir
+ */
+ private String taskDir;
+
+ /**
+ * python command executor
+ */
+ private PythonCommandExecutor pythonCommandExecutor;
+
+ /**
+ * taskExecutionContext
+ */
+ private TaskExecutionContext taskExecutionContext;
+
+ /**
+ * constructor
+ * @param taskExecutionContext taskExecutionContext
+ * @param logger logger
+ */
+ public PythonTask(TaskExecutionContext taskExecutionContext, Logger
logger) {
+ super(taskExecutionContext, logger);
+ this.taskExecutionContext = taskExecutionContext;
+
+ this.pythonCommandExecutor = new PythonCommandExecutor(this::logHandle,
+ taskExecutionContext,
+ logger);
}
- }
- @Override
- public void handle() throws Exception {
- try {
- // construct process
- CommandExecuteResult commandExecuteResult =
pythonCommandExecutor.run(buildCommand());
+ @Override
+ public void init() {
+ logger.info("python task params {}",
taskExecutionContext.getTaskParams());
- setExitStatusCode(commandExecuteResult.getExitStatusCode());
- setAppIds(commandExecuteResult.getAppIds());
- setProcessId(commandExecuteResult.getProcessId());
- }
- catch (Exception e) {
- logger.error("python task failure", e);
- setExitStatusCode(Constants.EXIT_CODE_FAILURE);
- throw e;
- }
- }
-
- @Override
- public void cancelApplication(boolean cancelApplication) throws Exception {
- // cancel process
- pythonCommandExecutor.cancelApplication();
- }
-
- /**
- * build command
- * @return raw python script
- * @throws Exception exception
- */
- private String buildCommand() throws Exception {
- String rawPythonScript =
pythonParameters.getRawScript().replaceAll("\\r\\n", "\n");
-
- // replace placeholder
- Map<String, Property> paramsMap =
ParamUtils.convert(ParamUtils.getUserDefParamsMap(taskExecutionContext.getDefinedParams()),
- taskExecutionContext.getDefinedParams(),
- pythonParameters.getLocalParametersMap(),
- CommandType.of(taskExecutionContext.getCmdTypeIfComplement()),
- taskExecutionContext.getScheduleTime());
- if (paramsMap != null){
- rawPythonScript =
ParameterUtils.convertParameterPlaceholders(rawPythonScript,
ParamUtils.convert(paramsMap));
- }
+ pythonParameters =
JSONUtils.parseObject(taskExecutionContext.getTaskParams(),
PythonParameters.class);
- logger.info("raw python script : {}", pythonParameters.getRawScript());
- logger.info("task dir : {}", taskDir);
-
- return rawPythonScript;
- }
+ if (!pythonParameters.checkParameters()) {
+ throw new RuntimeException("python task params is not valid");
+ }
+ }
- @Override
- public AbstractParameters getParameters() {
- return pythonParameters;
- }
+ @Override
+ public void handle() throws Exception {
+ try {
+ // construct process
+ CommandExecuteResult commandExecuteResult =
pythonCommandExecutor.run(buildCommand());
+
+ setExitStatusCode(commandExecuteResult.getExitStatusCode());
+ setAppIds(commandExecuteResult.getAppIds());
+ setProcessId(commandExecuteResult.getProcessId());
+ setVarPool(pythonCommandExecutor.getVarPool());
+ }
+ catch (Exception e) {
+ logger.error("python task failure", e);
+ setExitStatusCode(Constants.EXIT_CODE_FAILURE);
+ throw e;
+ }
+ }
+ @Override
+ public void cancelApplication(boolean cancelApplication) throws Exception {
+ // cancel process
+ pythonCommandExecutor.cancelApplication();
+ }
+ /**
+ * build command
+ * @return raw python script
+ * @throws Exception exception
+ */
+ private String buildCommand() throws Exception {
+ String rawPythonScript =
pythonParameters.getRawScript().replaceAll("\\r\\n", "\n");
+
+ // replace placeholder
+ Map<String, Property> paramsMap =
ParamUtils.convert(ParamUtils.getUserDefParamsMap(taskExecutionContext.getDefinedParams()),
+ taskExecutionContext.getDefinedParams(),
+ pythonParameters.getLocalParametersMap(),
+
CommandType.of(taskExecutionContext.getCmdTypeIfComplement()),
+ taskExecutionContext.getScheduleTime());
+
+ try {
+ rawPythonScript =
VarPoolUtils.convertPythonScriptPlaceholders(rawPythonScript);
+ }
+ catch (StringIndexOutOfBoundsException e) {
+ logger.error("setShareVar field format error, raw python script :
{}", rawPythonScript);
+ }
+
+ if (paramsMap != null) {
+ rawPythonScript =
ParameterUtils.convertParameterPlaceholders(rawPythonScript,
ParamUtils.convert(paramsMap));
+ }
+
+ logger.info("raw python script : {}", pythonParameters.getRawScript());
+ logger.info("task dir : {}", taskDir);
+
+ return rawPythonScript;
+ }
+ @Override
+ public AbstractParameters getParameters() {
+ return pythonParameters;
+ }
+
}
diff --git
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
index 6f64267..7344cf1 100644
---
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
+++
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
@@ -1464,17 +1464,20 @@ public class ProcessService {
* @param state state
* @param endTime endTime
* @param taskInstId taskInstId
+ * @param varPool varPool
*/
public void changeTaskState(ExecutionStatus state,
Date endTime,
int processId,
String appIds,
- int taskInstId) {
+ int taskInstId,
+ String varPool) {
TaskInstance taskInstance = taskInstanceMapper.selectById(taskInstId);
taskInstance.setPid(processId);
taskInstance.setAppLink(appIds);
taskInstance.setState(state);
taskInstance.setEndTime(endTime);
+ taskInstance.setVarPool(varPool);
saveTaskInstance(taskInstance);
}
diff --git a/pom.xml b/pom.xml
index c8cb5a9..e895b01 100644
--- a/pom.xml
+++ b/pom.xml
@@ -785,6 +785,7 @@
<include>**/common/utils/StringTest.java</include>
<include>**/common/utils/StringUtilsTest.java</include>
<include>**/common/utils/TaskParametersUtilsTest.java</include>
+
<include>**/common/utils/VarPoolUtilsTest.java</include>
<include>**/common/utils/HadoopUtilsTest.java</include>
<include>**/common/utils/HttpUtilsTest.java</include>
<include>**/common/utils/KerberosHttpClientTest.java</include>
diff --git a/sql/dolphinscheduler-postgre.sql b/sql/dolphinscheduler-postgre.sql
index 5ae37e1..e2f5ebd 100644
--- a/sql/dolphinscheduler-postgre.sql
+++ b/sql/dolphinscheduler-postgre.sql
@@ -377,6 +377,7 @@ CREATE TABLE t_ds_process_instance (
worker_group varchar(64) ,
timeout int DEFAULT '0' ,
tenant_id int NOT NULL DEFAULT '-1' ,
+ var_pool text ,
PRIMARY KEY (id)
) ;
create index process_instance_index on t_ds_process_instance
(process_definition_id,id);
@@ -595,6 +596,7 @@ CREATE TABLE t_ds_task_instance (
executor_id int DEFAULT NULL ,
first_submit_time timestamp DEFAULT NULL ,
delay_time int DEFAULT '0' ,
+ var_pool text ,
PRIMARY KEY (id)
) ;
diff --git a/sql/dolphinscheduler_mysql.sql b/sql/dolphinscheduler_mysql.sql
index 61e6975..9039a19 100644
--- a/sql/dolphinscheduler_mysql.sql
+++ b/sql/dolphinscheduler_mysql.sql
@@ -487,6 +487,7 @@ CREATE TABLE `t_ds_process_instance` (
`worker_group` varchar(64) DEFAULT NULL COMMENT 'worker group id',
`timeout` int(11) DEFAULT '0' COMMENT 'time out',
`tenant_id` int(11) NOT NULL DEFAULT '-1' COMMENT 'tenant id',
+ `var_pool` longtext COMMENT 'var_pool',
PRIMARY KEY (`id`),
KEY `process_instance_index` (`process_definition_id`,`id`) USING BTREE,
KEY `start_time_index` (`start_time`) USING BTREE
@@ -737,6 +738,7 @@ CREATE TABLE `t_ds_task_instance` (
`executor_id` int(11) DEFAULT NULL,
`first_submit_time` datetime DEFAULT NULL COMMENT 'task first submit time',
`delay_time` int(4) DEFAULT '0' COMMENT 'task delay execution time',
+ `var_pool` longtext COMMENT 'var_pool',
PRIMARY KEY (`id`),
KEY `process_instance_id` (`process_instance_id`) USING BTREE,
KEY `task_instance_index` (`process_definition_id`,`process_instance_id`)
USING BTREE,
diff --git a/sql/upgrade/1.3.3_schema/mysql/dolphinscheduler_ddl.sql
b/sql/upgrade/1.3.3_schema/mysql/dolphinscheduler_ddl.sql
index 4348827..ae66da9 100644
--- a/sql/upgrade/1.3.3_schema/mysql/dolphinscheduler_ddl.sql
+++ b/sql/upgrade/1.3.3_schema/mysql/dolphinscheduler_ddl.sql
@@ -56,6 +56,46 @@ delimiter ;
CALL uc_dolphin_T_t_ds_task_instance_A_delay_time();
DROP PROCEDURE uc_dolphin_T_t_ds_task_instance_A_delay_time;
+-- uc_dolphin_T_t_ds_task_instance_A_var_pool
+drop PROCEDURE if EXISTS uc_dolphin_T_t_ds_task_instance_A_var_pool;
+delimiter d//
+CREATE PROCEDURE uc_dolphin_T_t_ds_task_instance_A_var_pool()
+ BEGIN
+ IF NOT EXISTS (SELECT 1 FROM information_schema.COLUMNS
+ WHERE TABLE_NAME='t_ds_task_instance'
+ AND TABLE_SCHEMA=(SELECT DATABASE())
+ AND COLUMN_NAME ='var_pool')
+ THEN
+ ALTER TABLE t_ds_task_instance ADD `var_pool` longtext NULL;
+ END IF;
+ END;
+
+d//
+
+delimiter ;
+CALL uc_dolphin_T_t_ds_task_instance_A_var_pool();
+DROP PROCEDURE uc_dolphin_T_t_ds_task_instance_A_var_pool;
+
+-- uc_dolphin_T_t_ds_process_instance_A_var_pool
+drop PROCEDURE if EXISTS uc_dolphin_T_t_ds_process_instance_A_var_pool;
+delimiter d//
+CREATE PROCEDURE uc_dolphin_T_t_ds_process_instance_A_var_pool()
+ BEGIN
+ IF NOT EXISTS (SELECT 1 FROM information_schema.COLUMNS
+ WHERE TABLE_NAME='t_ds_process_instance'
+ AND TABLE_SCHEMA=(SELECT DATABASE())
+ AND COLUMN_NAME ='var_pool')
+ THEN
+ ALTER TABLE t_ds_process_instance ADD `var_pool` longtext NULL;
+ END IF;
+ END;
+
+d//
+
+delimiter ;
+CALL uc_dolphin_T_t_ds_process_instance_A_var_pool();
+DROP PROCEDURE uc_dolphin_T_t_ds_process_instance_A_var_pool;
+
-- uc_dolphin_T_t_ds_process_definition_A_modify_by
drop PROCEDURE if EXISTS ct_dolphin_T_t_ds_process_definition_version;
delimiter d//
diff --git a/sql/upgrade/1.3.3_schema/postgresql/dolphinscheduler_ddl.sql
b/sql/upgrade/1.3.3_schema/postgresql/dolphinscheduler_ddl.sql
index e276761..3351cac 100644
--- a/sql/upgrade/1.3.3_schema/postgresql/dolphinscheduler_ddl.sql
+++ b/sql/upgrade/1.3.3_schema/postgresql/dolphinscheduler_ddl.sql
@@ -51,6 +51,42 @@ delimiter ;
SELECT uc_dolphin_T_t_ds_task_instance_A_delay_time();
DROP FUNCTION IF EXISTS uc_dolphin_T_t_ds_task_instance_A_delay_time();
+-- uc_dolphin_T_t_ds_process_instance_A_var_pool
+delimiter d//
+CREATE OR REPLACE FUNCTION uc_dolphin_T_t_ds_process_instance_A_var_pool()
RETURNS void AS $$
+BEGIN
+ IF NOT EXISTS (SELECT 1 FROM information_schema.COLUMNS
+ WHERE TABLE_NAME='t_ds_process_instance'
+ AND COLUMN_NAME ='var_pool')
+ THEN
+ ALTER TABLE t_ds_process_instance ADD COLUMN var_pool text;
+ END IF;
+END;
+$$ LANGUAGE plpgsql;
+d//
+
+delimiter ;
+SELECT uc_dolphin_T_t_ds_process_instance_A_var_pool();
+DROP FUNCTION IF EXISTS uc_dolphin_T_t_ds_process_instance_A_var_pool();
+
+-- uc_dolphin_T_t_ds_task_instance_A_var_pool
+delimiter d//
+CREATE OR REPLACE FUNCTION uc_dolphin_T_t_ds_task_instance_A_var_pool()
RETURNS void AS $$
+BEGIN
+ IF NOT EXISTS (SELECT 1 FROM information_schema.COLUMNS
+ WHERE TABLE_NAME='t_ds_task_instance'
+ AND COLUMN_NAME ='var_pool')
+ THEN
+ ALTER TABLE t_ds_task_instance ADD COLUMN var_pool text;
+ END IF;
+END;
+$$ LANGUAGE plpgsql;
+d//
+
+delimiter ;
+SELECT uc_dolphin_T_t_ds_task_instance_A_var_pool();
+DROP FUNCTION IF EXISTS uc_dolphin_T_t_ds_task_instance_A_var_pool();
+
-- uc_dolphin_T_t_ds_process_definition_A_modify_by
delimiter d//
CREATE OR REPLACE FUNCTION ct_dolphin_T_t_ds_process_definition_version()
RETURNS void AS $$