This is an automated email from the ASF dual-hosted git repository.
leonbao pushed a commit to branch json_split_two
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/json_split_two by this push:
new 57414c4 [Feature][JsonSplit-api] merging from dev to json_split_two
(#5712)
57414c4 is described below
commit 57414c4df70d8931157d4b4608170a4784193c2a
Author: JinyLeeChina <[email protected]>
AuthorDate: Tue Jun 29 22:55:01 2021 +0800
[Feature][JsonSplit-api] merging from dev to json_split_two (#5712)
* [BUG-#5678][Registry]fix registry init node miss (#5686)
* [Improvement][UI] Update the update time after the user information is
successfully modified (#5684)
* improve
edit the userinfo success, but the updatetime is not the latest.
* Improved shell task execution result log information, adding
process.waitFor() and process.exitValue() information to the original log
(#5691)
Co-authored-by: shenglm <[email protected]>
* [Feature-#5565][Master Worker-Server] Global Param passed by sense
dependencies (#5603)
* add globalParams new plan with varPool
* add unit test
* add python task varPoolParams
Co-authored-by: wangxj <wangxj31>
* Issue robot translation judgment changed to Chinese (#5694)
Co-authored-by: chenxingchun <[email protected]>
* the update function should use post instead of get (#5703)
* enhance form verify (#5696)
* checkState only supports %s not {} (#5711)
* [Fix-5701]When deleting a user, the accessToken associated with the user
should also be deleted (#5697)
* update
* fix the codestyle error
* fix the compile error
* support rollback
Co-authored-by: Kirs <[email protected]>
Co-authored-by: kyoty <[email protected]>
Co-authored-by: ji04xiaogang <[email protected]>
Co-authored-by: shenglm <[email protected]>
Co-authored-by: wangxj3 <[email protected]>
Co-authored-by: xingchun-chen
<[email protected]>
Co-authored-by: chenxingchun <[email protected]>
Co-authored-by: JinyLeeChina <[email protected]>
---
.github/actions/translate-on-issue | 1 -
.gitmodules | 3 -
.../alert/plugin/AlertPluginManager.java | 2 +-
.../controller/AlertPluginInstanceController.java | 2 +-
.../api/service/impl/UsersServiceImpl.java | 7 +
.../api/service/UsersServiceTest.java | 18 +-
.../dolphinscheduler/common/enums/DataType.java | 3 +-
.../common/task/AbstractParameters.java | 182 ++++++++++++++++-----
.../common/task/shell/ShellParameters.java | 60 +++----
.../common/task/sql/SqlParameters.java | 65 ++++++++
.../common/utils/VarPoolUtils.java | 3 +-
.../common/task/EntityTestUtils.java | 2 +-
.../common/task/SqlParametersTest.java | 30 +++-
.../dao/mapper/AccessTokenMapper.java | 14 +-
.../dao/mapper/AccessTokenMapper.xml | 4 +
.../dao/mapper/AccessTokenMapperTest.java | 47 ++++--
.../remote/command/TaskExecuteResponseCommand.java | 12 --
.../builder/TaskExecutionContextBuilder.java | 14 +-
.../server/entity/TaskExecutionContext.java | 13 ++
.../master/processor/TaskResponseProcessor.java | 3 +-
.../master/processor/queue/TaskResponseEvent.java | 15 +-
.../processor/queue/TaskResponseService.java | 3 +-
.../master/registry/MasterRegistryClient.java | 16 +-
.../server/master/registry/ServerNodeManager.java | 7 +-
.../server/master/runner/MasterExecThread.java | 90 ++++++----
.../dolphinscheduler/server/utils/ParamUtils.java | 8 +-
.../worker/registry/WorkerRegistryClient.java | 3 +-
.../server/worker/runner/TaskExecuteThread.java | 7 +-
.../worker/task/AbstractCommandExecutor.java | 20 +--
.../server/worker/task/AbstractTask.java | 28 +---
.../server/worker/task/datax/DataxTask.java | 1 +
.../server/worker/task/flink/FlinkTask.java | 1 +
.../server/worker/task/http/HttpTask.java | 1 +
.../server/worker/task/mr/MapReduceTask.java | 1 +
.../worker/task/procedure/ProcedureTask.java | 1 +
.../server/worker/task/python/PythonTask.java | 3 +-
.../server/worker/task/shell/ShellTask.java | 17 +-
.../server/worker/task/spark/SparkTask.java | 1 +
.../server/worker/task/sql/SqlTask.java | 47 +++---
.../server/worker/task/sqoop/SqoopTask.java | 1 +
.../server/master/MasterExecThreadTest.java | 48 ++++++
.../dolphinscheduler/server/master/ParamsTest.java | 33 ++--
.../processor/queue/TaskResponseServiceTest.java | 3 +-
.../server/utils/ParamUtilsTest.java | 42 +++--
.../worker/processor/TaskCallbackServiceTest.java | 2 -
.../worker/processor/TaskKillProcessorTest.java | 6 +-
.../worker/registry/WorkerRegistryClientTest.java | 2 -
.../worker/runner/TaskExecuteThreadTest.java | 3 +-
.../worker/task/AbstractCommandExecutorTest.java | 53 ------
.../server/worker/task/ShellTaskReturnTest.java | 11 --
.../server/worker/task/TaskManagerTest.java | 5 -
.../server/worker/task/TaskParamsTest.java | 77 +++++++++
.../server/worker/task/shell/ShellTaskTest.java | 12 +-
.../server/worker/task/sql/SqlTaskTest.java | 2 +
.../service/process/ProcessService.java | 55 ++-----
.../service/registry/RegistryCenter.java | 40 +----
.../service/registry/RegistryClient.java | 8 +-
.../service/process/ProcessServiceTest.java | 15 ++
.../spi/plugin/DolphinPluginLoader.java | 2 +-
.../pages/warningGroups/_source/createWarning.vue | 4 +
.../_source/createWarningInstance.vue | 4 +
.../home/pages/user/pages/account/_source/info.vue | 7 +-
.../src/js/conf/home/store/security/actions.js | 2 +-
.../src/js/module/i18n/locale/en_US.js | 2 +
.../src/js/module/i18n/locale/zh_CN.js | 2 +
pom.xml | 2 +-
66 files changed, 738 insertions(+), 460 deletions(-)
diff --git a/.github/actions/translate-on-issue
b/.github/actions/translate-on-issue
deleted file mode 160000
index 959b66f..0000000
--- a/.github/actions/translate-on-issue
+++ /dev/null
@@ -1 +0,0 @@
-Subproject commit 959b66feb4231b08e8251422ac6d469cdc03d140
diff --git a/.gitmodules b/.gitmodules
index d5c455f..11414db 100644
--- a/.gitmodules
+++ b/.gitmodules
@@ -21,6 +21,3 @@
[submodule ".github/actions/lable-on-issue"]
path = .github/actions/lable-on-issue
url = https://github.com/xingchun-chen/labeler
-[submodule ".github/actions/translate-on-issue"]
- path = .github/actions/translate-on-issue
- url = https://github.com/xingchun-chen/translation-helper.git
diff --git
a/dolphinscheduler-alert/src/main/java/org/apache/dolphinscheduler/alert/plugin/AlertPluginManager.java
b/dolphinscheduler-alert/src/main/java/org/apache/dolphinscheduler/alert/plugin/AlertPluginManager.java
index 5788cf9..4fbe2bd 100644
---
a/dolphinscheduler-alert/src/main/java/org/apache/dolphinscheduler/alert/plugin/AlertPluginManager.java
+++
b/dolphinscheduler-alert/src/main/java/org/apache/dolphinscheduler/alert/plugin/AlertPluginManager.java
@@ -76,7 +76,7 @@ public class AlertPluginManager extends
AbstractDolphinPluginManager {
requireNonNull(name, "name is null");
AlertChannelFactory alertChannelFactory =
alertChannelFactoryMap.get(name);
- checkState(alertChannelFactory != null, "Alert Plugin {} is not
registered", name);
+ checkState(alertChannelFactory != null, "Alert Plugin %s is not
registered", name);
try (ThreadContextClassLoader ignored = new
ThreadContextClassLoader(alertChannelFactory.getClass().getClassLoader())) {
AlertChannel alertChannel = alertChannelFactory.create();
diff --git
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/AlertPluginInstanceController.java
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/AlertPluginInstanceController.java
index 42c5536..346e041 100644
---
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/AlertPluginInstanceController.java
+++
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/AlertPluginInstanceController.java
@@ -108,7 +108,7 @@ public class AlertPluginInstanceController extends
BaseController {
@ApiImplicitParam(name = "instanceName", value =
"ALERT_PLUGIN_INSTANCE_NAME", required = true, dataType = "String", example =
"DING TALK"),
@ApiImplicitParam(name = "pluginInstanceParams", value =
"ALERT_PLUGIN_INSTANCE_PARAMS", required = true, dataType = "String", example =
"ALERT_PLUGIN_INSTANCE_PARAMS")
})
- @GetMapping(value = "/update")
+ @PostMapping(value = "/update")
@ResponseStatus(HttpStatus.OK)
@ApiException(UPDATE_ALERT_PLUGIN_INSTANCE_ERROR)
@AccessLogAnnotation(ignoreRequestArgs = "loginUser")
diff --git
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/UsersServiceImpl.java
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/UsersServiceImpl.java
index 23a6e89..6c6e4d5 100644
---
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/UsersServiceImpl.java
+++
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/UsersServiceImpl.java
@@ -43,6 +43,7 @@ import org.apache.dolphinscheduler.dao.entity.ResourcesUser;
import org.apache.dolphinscheduler.dao.entity.Tenant;
import org.apache.dolphinscheduler.dao.entity.UDFUser;
import org.apache.dolphinscheduler.dao.entity.User;
+import org.apache.dolphinscheduler.dao.mapper.AccessTokenMapper;
import org.apache.dolphinscheduler.dao.mapper.AlertGroupMapper;
import org.apache.dolphinscheduler.dao.mapper.DataSourceUserMapper;
import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper;
@@ -84,6 +85,9 @@ public class UsersServiceImpl extends BaseServiceImpl
implements UsersService {
private static final Logger logger =
LoggerFactory.getLogger(UsersServiceImpl.class);
@Autowired
+ private AccessTokenMapper accessTokenMapper;
+
+ @Autowired
private UserMapper userMapper;
@Autowired
@@ -482,6 +486,7 @@ public class UsersServiceImpl extends BaseServiceImpl
implements UsersService {
* @throws Exception exception when operate hdfs
*/
@Override
+ @Transactional(rollbackFor = RuntimeException.class)
public Map<String, Object> deleteUserById(User loginUser, int id) throws
IOException {
Map<String, Object> result = new HashMap<>();
//only admin can operate
@@ -514,6 +519,8 @@ public class UsersServiceImpl extends BaseServiceImpl
implements UsersService {
}
}
+ accessTokenMapper.deleteAccessTokenByUserId(id);
+
userMapper.deleteById(id);
putMsg(result, Status.SUCCESS);
diff --git
a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/UsersServiceTest.java
b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/UsersServiceTest.java
index f2df3c1..bc0a922 100644
---
a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/UsersServiceTest.java
+++
b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/UsersServiceTest.java
@@ -35,6 +35,7 @@ import org.apache.dolphinscheduler.dao.entity.Project;
import org.apache.dolphinscheduler.dao.entity.Resource;
import org.apache.dolphinscheduler.dao.entity.Tenant;
import org.apache.dolphinscheduler.dao.entity.User;
+import org.apache.dolphinscheduler.dao.mapper.AccessTokenMapper;
import org.apache.dolphinscheduler.dao.mapper.AlertGroupMapper;
import org.apache.dolphinscheduler.dao.mapper.DataSourceUserMapper;
import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
@@ -80,6 +81,9 @@ public class UsersServiceTest {
private UserMapper userMapper;
@Mock
+ private AccessTokenMapper accessTokenMapper;
+
+ @Mock
private TenantMapper tenantMapper;
@Mock
@@ -221,7 +225,6 @@ public class UsersServiceTest {
Assert.assertEquals(user.getId(), userExistId);
}
-
@Test
public void testQueryUserList() {
User user = new User();
@@ -265,13 +268,13 @@ public class UsersServiceTest {
String userPassword = "userTest0001";
try {
//user not exist
- Map<String, Object> result =
usersService.updateUser(getLoginUser(),
0,userName,userPassword,"[email protected]",1,"13457864543","queue", 1);
+ Map<String, Object> result =
usersService.updateUser(getLoginUser(), 0, userName, userPassword,
"[email protected]", 1, "13457864543", "queue", 1);
Assert.assertEquals(Status.USER_NOT_EXIST,
result.get(Constants.STATUS));
logger.info(result.toString());
//success
when(userMapper.selectById(1)).thenReturn(getUser());
- result = usersService.updateUser(getLoginUser(),
1,userName,userPassword,"[email protected]",1,"13457864543","queue", 1);
+ result = usersService.updateUser(getLoginUser(), 1, userName,
userPassword, "[email protected]", 1, "13457864543", "queue", 1);
logger.info(result.toString());
Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS));
} catch (Exception e) {
@@ -286,7 +289,7 @@ public class UsersServiceTest {
try {
when(userMapper.queryTenantCodeByUserId(1)).thenReturn(getUser());
when(userMapper.selectById(1)).thenReturn(getUser());
-
+ when(accessTokenMapper.deleteAccessTokenByUserId(1)).thenReturn(0);
//no operate
Map<String, Object> result =
usersService.deleteUserById(loginUser, 3);
logger.info(result.toString());
@@ -356,7 +359,6 @@ public class UsersServiceTest {
}
-
@Test
public void testGrantUDFFunction() {
String udfIds = "100000,120000";
@@ -398,7 +400,7 @@ public class UsersServiceTest {
}
- private User getLoginUser(){
+ private User getLoginUser() {
User loginUser = new User();
loginUser.setId(1);
loginUser.setUserType(UserType.ADMIN_USER);
@@ -431,7 +433,6 @@ public class UsersServiceTest {
Assert.assertEquals("userTest0001", tempUser.getUserName());
}
-
@Test
public void testQueryAllGeneralUsers() {
User loginUser = new User();
@@ -478,7 +479,6 @@ public class UsersServiceTest {
Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS));
}
-
@Test
public void testAuthorizedUser() {
User loginUser = new User();
@@ -535,7 +535,6 @@ public class UsersServiceTest {
}
}
-
@Test
public void testActivateUser() {
User user = new User();
@@ -618,7 +617,6 @@ public class UsersServiceTest {
return user;
}
-
/**
* get user
*/
diff --git
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/DataType.java
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/DataType.java
index eda00d8..2b0930d 100644
---
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/DataType.java
+++
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/DataType.java
@@ -30,6 +30,7 @@ public enum DataType {
* 6 time, "HH:MM:SS"
* 7 time stamp
* 8 Boolean
+ * 9 list <String>
*/
- VARCHAR,INTEGER,LONG,FLOAT,DOUBLE,DATE,TIME,TIMESTAMP,BOOLEAN
+ VARCHAR,INTEGER,LONG,FLOAT,DOUBLE,DATE,TIME,TIMESTAMP,BOOLEAN,LIST
}
diff --git
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/AbstractParameters.java
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/AbstractParameters.java
index 929516c..686642d 100644
---
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/AbstractParameters.java
+++
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/AbstractParameters.java
@@ -16,55 +16,163 @@
*/
package org.apache.dolphinscheduler.common.task;
+import org.apache.dolphinscheduler.common.enums.Direct;
import org.apache.dolphinscheduler.common.process.Property;
import org.apache.dolphinscheduler.common.process.ResourceInfo;
+import org.apache.dolphinscheduler.common.utils.CollectionUtils;
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
+import org.apache.dolphinscheduler.common.utils.StringUtils;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+
/**
* job params related class
*/
public abstract class AbstractParameters implements IParameters {
- @Override
- public abstract boolean checkParameters();
-
- @Override
- public abstract List<ResourceInfo> getResourceFilesList();
-
- /**
- * local parameters
- */
- public List<Property> localParams;
-
- /**
- * get local parameters list
- * @return Property list
- */
- public List<Property> getLocalParams() {
- return localParams;
- }
-
- public void setLocalParams(List<Property> localParams) {
- this.localParams = localParams;
- }
-
- /**
- * get local parameters map
- * @return parameters map
- */
- public Map<String,Property> getLocalParametersMap() {
- if (localParams != null) {
- Map<String,Property> localParametersMaps = new LinkedHashMap<>();
-
- for (Property property : localParams) {
- localParametersMaps.put(property.getProp(),property);
+ @Override
+ public abstract boolean checkParameters();
+
+ @Override
+ public abstract List<ResourceInfo> getResourceFilesList();
+
+ /**
+ * local parameters
+ */
+ public List<Property> localParams;
+
+ /**
+ * var pool
+ */
+ public List<Property> varPool;
+
+ /**
+ * get local parameters list
+ *
+ * @return Property list
+ */
+ public List<Property> getLocalParams() {
+ return localParams;
+ }
+
+ public void setLocalParams(List<Property> localParams) {
+ this.localParams = localParams;
+ }
+
+ /**
+ * get local parameters map
+ *
+ * @return parameters map
+ */
+ public Map<String, Property> getLocalParametersMap() {
+ if (localParams != null) {
+ Map<String, Property> localParametersMaps = new LinkedHashMap<>();
+
+ for (Property property : localParams) {
+ localParametersMaps.put(property.getProp(), property);
+ }
+ return localParametersMaps;
+ }
+ return null;
+ }
+
+ /**
+ * get varPool map
+ *
+ * @return parameters map
+ */
+ public Map<String, Property> getVarPoolMap() {
+ if (varPool != null) {
+ Map<String, Property> varPoolMap = new LinkedHashMap<>();
+ for (Property property : varPool) {
+ varPoolMap.put(property.getProp(), property);
+ }
+ return varPoolMap;
+ }
+ return null;
+ }
+
+ public List<Property> getVarPool() {
+ return varPool;
+ }
+
+ public void setVarPool(String varPool) {
+ if (StringUtils.isEmpty(varPool)) {
+ this.varPool = new ArrayList<>();
+ } else {
+ this.varPool = JSONUtils.toList(varPool, Property.class);
+ }
+ }
+
+ public void dealOutParam(String result) {
+ if (CollectionUtils.isEmpty(localParams)) {
+ return;
+ }
+ List<Property> outProperty = getOutProperty(localParams);
+ if (CollectionUtils.isEmpty(outProperty)) {
+ return;
+ }
+ if (StringUtils.isEmpty(result)) {
+ varPool.addAll(outProperty);
+ return;
+ }
+ Map<String, String> taskResult = getMapByString(result);
+ if (taskResult == null || taskResult.size() == 0) {
+ return;
+ }
+ for (Property info : outProperty) {
+ info.setValue(taskResult.get(info.getProp()));
+ varPool.add(info);
+ }
+ }
+
+ public List<Property> getOutProperty(List<Property> params) {
+ if (CollectionUtils.isEmpty(params)) {
+ return new ArrayList<>();
+ }
+ List<Property> result = new ArrayList<>();
+ for (Property info : params) {
+ if (info.getDirect() == Direct.OUT) {
+ result.add(info);
+ }
+ }
+ return result;
+ }
+
+ public List<Map<String, String>> getListMapByString(String json) {
+ List<Map<String, String>> allParams = new ArrayList<>();
+ ArrayNode paramsByJson = JSONUtils.parseArray(json);
+ Iterator<JsonNode> listIterator = paramsByJson.iterator();
+ while (listIterator.hasNext()) {
+ Map<String, String> param =
JSONUtils.toMap(listIterator.next().toString(), String.class, String.class);
+ allParams.add(param);
+ }
+ return allParams;
+ }
+
+ /**
+ * shell's result format is key=value$VarPool$key=value$VarPool$
+ * @param result
+ * @return
+ */
+ public static Map<String, String> getMapByString(String result) {
+ String[] formatResult = result.split("\\$VarPool\\$");
+ Map<String, String> format = new HashMap<>();
+ for (String info : formatResult) {
+ if (StringUtils.isNotEmpty(info) && info.contains("=")) {
+ String[] keyValue = info.split("=");
+ format.put(keyValue[0], keyValue[1]);
+ }
}
- return localParametersMaps;
- }
- return null;
- }
+ return format;
+ }
}
diff --git
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/shell/ShellParameters.java
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/shell/ShellParameters.java
index e11e596..7388cd3 100644
---
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/shell/ShellParameters.java
+++
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/shell/ShellParameters.java
@@ -14,52 +14,52 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.dolphinscheduler.common.task.shell;
+package org.apache.dolphinscheduler.common.task.shell;
import org.apache.dolphinscheduler.common.process.ResourceInfo;
import org.apache.dolphinscheduler.common.task.AbstractParameters;
import java.util.List;
-import java.util.stream.Collectors;
/**
* shell parameters
*/
public class ShellParameters extends AbstractParameters {
- /**
- * shell script
- */
- private String rawScript;
+ /**
+ * shell script
+ */
+ private String rawScript;
+
+ /**
+ * resource list
+ */
+ private List<ResourceInfo> resourceList;
- /**
- * resource list
- */
- private List<ResourceInfo> resourceList;
+ public String getRawScript() {
+ return rawScript;
+ }
- public String getRawScript() {
- return rawScript;
- }
+ public void setRawScript(String rawScript) {
+ this.rawScript = rawScript;
+ }
- public void setRawScript(String rawScript) {
- this.rawScript = rawScript;
- }
+ public List<ResourceInfo> getResourceList() {
+ return resourceList;
+ }
- public List<ResourceInfo> getResourceList() {
- return resourceList;
- }
+ public void setResourceList(List<ResourceInfo> resourceList) {
+ this.resourceList = resourceList;
+ }
- public void setResourceList(List<ResourceInfo> resourceList) {
- this.resourceList = resourceList;
- }
+ @Override
+ public boolean checkParameters() {
+ return rawScript != null && !rawScript.isEmpty();
+ }
- @Override
- public boolean checkParameters() {
- return rawScript != null && !rawScript.isEmpty();
- }
+ @Override
+ public List<ResourceInfo> getResourceFilesList() {
+ return resourceList;
+ }
- @Override
- public List<ResourceInfo> getResourceFilesList() {
- return resourceList;
- }
}
diff --git
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/sql/SqlParameters.java
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/sql/SqlParameters.java
index a83cd64..59259a5 100644
---
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/sql/SqlParameters.java
+++
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/sql/SqlParameters.java
@@ -17,12 +17,19 @@
package org.apache.dolphinscheduler.common.task.sql;
+import org.apache.dolphinscheduler.common.enums.DataType;
+import org.apache.dolphinscheduler.common.process.Property;
import org.apache.dolphinscheduler.common.process.ResourceInfo;
import org.apache.dolphinscheduler.common.task.AbstractParameters;
+import org.apache.dolphinscheduler.common.utils.CollectionUtils;
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
+import java.util.Set;
/**
* Sql/Hql parameter
@@ -94,6 +101,16 @@ public class SqlParameters extends AbstractParameters {
*/
private String title;
+ private int limit;
+
+ public int getLimit() {
+ return limit;
+ }
+
+ public void setLimit(int limit) {
+ this.limit = limit;
+ }
+
public String getType() {
return type;
}
@@ -209,6 +226,53 @@ public class SqlParameters extends AbstractParameters {
}
@Override
+ public void dealOutParam(String result) {
+ if (CollectionUtils.isEmpty(localParams)) {
+ return;
+ }
+ List<Property> outProperty = getOutProperty(localParams);
+ if (CollectionUtils.isEmpty(outProperty)) {
+ return;
+ }
+ if (StringUtils.isEmpty(result)) {
+ varPool.addAll(outProperty);
+ return;
+ }
+ List<Map<String, String>> sqlResult = getListMapByString(result);
+ if (CollectionUtils.isEmpty(sqlResult)) {
+ return;
+ }
+ //if sql return more than one line
+ if (sqlResult.size() > 1) {
+ Map<String, List<String>> sqlResultFormat = new HashMap<>();
+ //init sqlResultFormat
+ Set<String> keySet = sqlResult.get(0).keySet();
+ for (String key : keySet) {
+ sqlResultFormat.put(key, new ArrayList<>());
+ }
+ for (Map<String, String> info : sqlResult) {
+ for (String key : info.keySet()) {
+
sqlResultFormat.get(key).add(String.valueOf(info.get(key)));
+ }
+ }
+ for (Property info : outProperty) {
+ if (info.getType() == DataType.LIST) {
+
info.setValue(JSONUtils.toJsonString(sqlResultFormat.get(info.getProp())));
+ varPool.add(info);
+ }
+ }
+ } else {
+ //result only one line
+ Map<String, String> firstRow = sqlResult.get(0);
+ for (Property info : outProperty) {
+ info.setValue(String.valueOf(firstRow.get(info.getProp())));
+ varPool.add(info);
+ }
+ }
+
+ }
+
+ @Override
public String toString() {
return "SqlParameters{"
+ "type='" + type + '\''
@@ -217,6 +281,7 @@ public class SqlParameters extends AbstractParameters {
+ ", sqlType=" + sqlType
+ ", sendEmail=" + sendEmail
+ ", displayRows=" + displayRows
+ + ", limit=" + limit
+ ", udfs='" + udfs + '\''
+ ", showType='" + showType + '\''
+ ", connParams='" + connParams + '\''
diff --git
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/VarPoolUtils.java
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/VarPoolUtils.java
index cd300e3..f286300 100644
---
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/VarPoolUtils.java
+++
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/VarPoolUtils.java
@@ -21,7 +21,6 @@ import org.apache.dolphinscheduler.common.model.TaskNode;
import java.text.ParseException;
import java.util.ArrayList;
-import java.util.HashMap;
import java.util.Map;
public class VarPoolUtils {
@@ -71,7 +70,7 @@ public class VarPoolUtils {
if (kvs.length == 2) {
propToValue.put(kvs[0], kvs[1]);
} else {
- throw new ParseException(kv, 2);
+ return;
}
}
}
diff --git
a/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/task/EntityTestUtils.java
b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/task/EntityTestUtils.java
index 5d867bc..8e9b451 100644
---
a/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/task/EntityTestUtils.java
+++
b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/task/EntityTestUtils.java
@@ -32,7 +32,7 @@ public class EntityTestUtils {
static {
OBJECT_MAP.put("java.lang.Long", 1L);
- OBJECT_MAP.put("java.lang.String", "test");
+ OBJECT_MAP.put("java.lang.String",
"[{\"direct\":\"OUT\",\"prop\":\"percentage5\",\"type\":\"VARCHAR\",\"value\":\"qwe\"}]");
OBJECT_MAP.put("java.lang.Integer", 1);
OBJECT_MAP.put("int", 1);
OBJECT_MAP.put("long", 1L);
diff --git
a/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/task/SqlParametersTest.java
b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/task/SqlParametersTest.java
index 6fc4d6c..17e95cf 100644
---
a/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/task/SqlParametersTest.java
+++
b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/task/SqlParametersTest.java
@@ -17,9 +17,17 @@
package org.apache.dolphinscheduler.common.task;
+import static org.junit.Assert.assertNotNull;
+
+import org.apache.dolphinscheduler.common.enums.DataType;
+import org.apache.dolphinscheduler.common.enums.Direct;
+import org.apache.dolphinscheduler.common.process.Property;
import org.apache.dolphinscheduler.common.task.sql.SqlParameters;
import org.apache.dolphinscheduler.common.utils.CollectionUtils;
+import java.util.ArrayList;
+import java.util.List;
+
import org.junit.Assert;
import org.junit.Test;
@@ -38,6 +46,14 @@ public class SqlParametersTest {
@Test
public void testSqlParameters() {
+ List<Property> properties = new ArrayList<>();
+ Property property = new Property();
+ property.setProp("test1");
+ property.setDirect(Direct.OUT);
+ property.setType(DataType.VARCHAR);
+ property.setValue("test1");
+ properties.add(property);
+
SqlParameters sqlParameters = new SqlParameters();
Assert.assertTrue(CollectionUtils.isEmpty(sqlParameters.getResourceFilesList()));
@@ -63,6 +79,18 @@ public class SqlParametersTest {
Assert.assertEquals(title, sqlParameters.getTitle());
Assert.assertEquals(groupId, sqlParameters.getGroupId());
- Assert.assertTrue(sqlParameters.checkParameters());
+ String sqlResult =
"[{\"id\":6,\"test1\":\"6\"},{\"id\":70002,\"test1\":\"+1\"}]";
+ String sqlResult1 = "[{\"id\":6,\"test1\":\"6\"}]";
+ sqlParameters.setLocalParams(properties);
+ sqlParameters.varPool = new ArrayList<>();
+ sqlParameters.dealOutParam(sqlResult1);
+ assertNotNull(sqlParameters.getVarPool().get(0));
+
+ property.setType(DataType.LIST);
+ properties.clear();
+ properties.add(property);
+ sqlParameters.setLocalParams(properties);
+ sqlParameters.dealOutParam(sqlResult);
+ assertNotNull(sqlParameters.getVarPool().get(0));
}
}
diff --git
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/AccessTokenMapper.java
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/AccessTokenMapper.java
index 2e82744..472ba35 100644
---
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/AccessTokenMapper.java
+++
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/AccessTokenMapper.java
@@ -14,13 +14,16 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.dolphinscheduler.dao.mapper;
import org.apache.dolphinscheduler.dao.entity.AccessToken;
+
+import org.apache.ibatis.annotations.Param;
+
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
-import org.apache.ibatis.annotations.Param;
/**
* accesstoken mapper interface
@@ -30,6 +33,7 @@ public interface AccessTokenMapper extends
BaseMapper<AccessToken> {
/**
* access token page
+ *
* @param page page
* @param userName userName
* @param userId userId
@@ -39,4 +43,12 @@ public interface AccessTokenMapper extends
BaseMapper<AccessToken> {
@Param("userName") String
userName,
@Param("userId") int userId
);
+
+ /**
+ * delete by userId
+ *
+ * @param userId userId
+ * @return delete result
+ */
+ int deleteAccessTokenByUserId(@Param("userId") int userId);
}
diff --git
a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/AccessTokenMapper.xml
b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/AccessTokenMapper.xml
index 02fc952..35312fb 100644
---
a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/AccessTokenMapper.xml
+++
b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/AccessTokenMapper.xml
@@ -31,4 +31,8 @@
</if>
order by t.update_time desc
</select>
+ <delete id="deleteAccessTokenByUserId">
+ delete from t_ds_access_token
+ where user_id = #{userId}
+ </delete>
</mapper>
diff --git
a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/AccessTokenMapperTest.java
b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/AccessTokenMapperTest.java
index 30c8cdc..31958a7 100644
---
a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/AccessTokenMapperTest.java
+++
b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/AccessTokenMapperTest.java
@@ -16,12 +16,22 @@
*/
package org.apache.dolphinscheduler.dao.mapper;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.greaterThan;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
import org.apache.dolphinscheduler.common.enums.UserType;
import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.dao.entity.AccessToken;
-import com.baomidou.mybatisplus.core.metadata.IPage;
-import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import org.apache.dolphinscheduler.dao.entity.User;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ThreadLocalRandom;
+
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -31,14 +41,8 @@ import org.springframework.test.annotation.Rollback;
import org.springframework.test.context.junit4.SpringRunner;
import org.springframework.transaction.annotation.Transactional;
-import javax.annotation.Resource;
-import java.text.SimpleDateFormat;
-import java.util.*;
-import java.util.concurrent.ThreadLocalRandom;
-
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.Matchers.greaterThan;
-import static org.junit.Assert.*;
+import com.baomidou.mybatisplus.core.metadata.IPage;
+import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
/**
* AccessToken mapper test
@@ -57,8 +61,6 @@ public class AccessTokenMapperTest {
/**
* test insert
- *
- * @throws Exception
*/
@Test
public void testInsert() throws Exception {
@@ -68,6 +70,27 @@ public class AccessTokenMapperTest {
assertThat(accessToken.getId(), greaterThan(0));
}
+ /**
+ * test delete AccessToken By UserId
+ */
+ @Test
+ public void testDeleteAccessTokenByUserId() throws Exception {
+ Integer userId = 1;
+ int insertCount = 0;
+
+ for (int i = 0; i < 10; i++) {
+ try {
+ createAccessToken(userId);
+ insertCount++;
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+ int deleteCount = accessTokenMapper.deleteAccessTokenByUserId(userId);
+ Assert.assertEquals(insertCount, deleteCount);
+ }
+
/**
* test select by id
diff --git
a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskExecuteResponseCommand.java
b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskExecuteResponseCommand.java
index 93cc3ea..de5b82c 100644
---
a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskExecuteResponseCommand.java
+++
b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskExecuteResponseCommand.java
@@ -68,10 +68,6 @@ public class TaskExecuteResponseCommand implements
Serializable {
* varPool string
*/
private String varPool;
- /**
- * task return result
- */
- private String result;
public void setVarPool(String varPool) {
this.varPool = varPool;
@@ -143,12 +139,4 @@ public class TaskExecuteResponseCommand implements
Serializable {
+ ", appIds='" + appIds + '\''
+ '}';
}
-
- public String getResult() {
- return result;
- }
-
- public void setResult(String result) {
- this.result = result;
- }
}
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/builder/TaskExecutionContextBuilder.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/builder/TaskExecutionContextBuilder.java
index da46e4d..c1cca3a 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/builder/TaskExecutionContextBuilder.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/builder/TaskExecutionContextBuilder.java
@@ -21,8 +21,15 @@ import static
org.apache.dolphinscheduler.common.Constants.SEC_2_MINUTES_TIME_UN
import org.apache.dolphinscheduler.common.enums.TaskTimeoutStrategy;
import org.apache.dolphinscheduler.common.enums.TimeoutFlag;
-import org.apache.dolphinscheduler.dao.entity.*;
-import org.apache.dolphinscheduler.server.entity.*;
+import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
+import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
+import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
+import org.apache.dolphinscheduler.dao.entity.TaskInstance;
+import org.apache.dolphinscheduler.server.entity.DataxTaskExecutionContext;
+import org.apache.dolphinscheduler.server.entity.ProcedureTaskExecutionContext;
+import org.apache.dolphinscheduler.server.entity.SQLTaskExecutionContext;
+import org.apache.dolphinscheduler.server.entity.SqoopTaskExecutionContext;
+import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
/**
* TaskExecutionContext builder
@@ -41,7 +48,7 @@ public class TaskExecutionContextBuilder {
* @param taskInstance taskInstance
* @return TaskExecutionContextBuilder
*/
- public TaskExecutionContextBuilder
buildTaskInstanceRelatedInfo(TaskInstance taskInstance){
+ public TaskExecutionContextBuilder
buildTaskInstanceRelatedInfo(TaskInstance taskInstance) {
taskExecutionContext.setTaskInstanceId(taskInstance.getId());
taskExecutionContext.setTaskName(taskInstance.getName());
taskExecutionContext.setFirstSubmitTime(taskInstance.getFirstSubmitTime());
@@ -52,6 +59,7 @@ public class TaskExecutionContextBuilder {
taskExecutionContext.setHost(taskInstance.getHost());
taskExecutionContext.setResources(taskInstance.getResources());
taskExecutionContext.setDelayTime(taskInstance.getDelayTime());
+ taskExecutionContext.setVarPool(taskInstance.getVarPool());
return this;
}
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/TaskExecutionContext.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/TaskExecutionContext.java
index 8490849..7a47107 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/TaskExecutionContext.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/TaskExecutionContext.java
@@ -217,6 +217,11 @@ public class TaskExecutionContext implements Serializable {
private SqoopTaskExecutionContext sqoopTaskExecutionContext;
/**
+ * taskInstance varPool
+ */
+ private String varPool;
+
+ /**
* procedure TaskExecutionContext
*/
private ProcedureTaskExecutionContext procedureTaskExecutionContext;
@@ -556,4 +561,12 @@ public class TaskExecutionContext implements Serializable {
+ ", procedureTaskExecutionContext=" +
procedureTaskExecutionContext
+ '}';
}
+
+ public String getVarPool() {
+ return varPool;
+ }
+
+ public void setVarPool(String varPool) {
+ this.varPool = varPool;
+ }
}
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java
index 186c4f3..c307b2c 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java
@@ -80,8 +80,7 @@ public class TaskResponseProcessor implements
NettyRequestProcessor {
responseCommand.getAppIds(),
responseCommand.getTaskInstanceId(),
responseCommand.getVarPool(),
- channel,
- responseCommand.getResult()
+ channel
);
taskResponseService.addResponse(taskResponseEvent);
}
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseEvent.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseEvent.java
index 9789bcc..05466e8 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseEvent.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseEvent.java
@@ -92,10 +92,6 @@ public class TaskResponseEvent {
* channel
*/
private Channel channel;
- /**
- * task return result
- */
- private String result;
public static TaskResponseEvent newAck(ExecutionStatus state,
Date startTime,
@@ -122,8 +118,7 @@ public class TaskResponseEvent {
String appIds,
int taskInstanceId,
String varPool,
- Channel channel,
- String result) {
+ Channel channel) {
TaskResponseEvent event = new TaskResponseEvent();
event.setState(state);
event.setEndTime(endTime);
@@ -133,7 +128,6 @@ public class TaskResponseEvent {
event.setEvent(Event.RESULT);
event.setVarPool(varPool);
event.setChannel(channel);
- event.setResult(result);
return event;
}
@@ -233,11 +227,4 @@ public class TaskResponseEvent {
this.channel = channel;
}
- public String getResult() {
- return result;
- }
-
- public void setResult(String result) {
- this.result = result;
- }
}
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseService.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseService.java
index f3f2e7f..1b5eddb 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseService.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseService.java
@@ -165,8 +165,7 @@ public class TaskResponseService {
taskResponseEvent.getProcessId(),
taskResponseEvent.getAppIds(),
taskResponseEvent.getTaskInstanceId(),
- taskResponseEvent.getVarPool(),
- taskResponseEvent.getResult()
+ taskResponseEvent.getVarPool()
);
}
// if taskInstance is null (maybe deleted) . retry will be
meaningless . so response success
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java
index 3a2e304..1286818 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java
@@ -17,6 +17,7 @@
package org.apache.dolphinscheduler.server.master.registry;
+import static
org.apache.dolphinscheduler.common.Constants.REGISTRY_DOLPHINSCHEDULER_MASTERS;
import static
org.apache.dolphinscheduler.common.Constants.REGISTRY_DOLPHINSCHEDULER_NODE;
import static org.apache.dolphinscheduler.common.Constants.SLEEP_TIME_MILLIS;
@@ -135,18 +136,6 @@ public class MasterRegistryClient {
}
/**
- * init system node
- */
- private void initMasterSystemNode() {
- try {
-
registryClient.persist(Constants.REGISTRY_DOLPHINSCHEDULER_DEAD_SERVERS, "");
- logger.info("initialize master server nodes success.");
- } catch (Exception e) {
- logger.error("init system node failed", e);
- }
- }
-
- /**
* remove zookeeper node path
*
* @param path zookeeper node path
@@ -346,7 +335,6 @@ public class MasterRegistryClient {
* registry
*/
public void registry() {
- initMasterSystemNode();
String address = NetUtils.getAddr(masterConfig.getListenPort());
localNodePath = getMasterPath();
registryClient.persistEphemeral(localNodePath, "");
@@ -395,7 +383,7 @@ public class MasterRegistryClient {
*/
public String getMasterPath() {
String address = getLocalAddress();
- return registryClient.getMasterPath() + "/" + address;
+ return REGISTRY_DOLPHINSCHEDULER_MASTERS + "/" + address;
}
/**
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/ServerNodeManager.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/ServerNodeManager.java
index 0162af6..6a9167e 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/ServerNodeManager.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/ServerNodeManager.java
@@ -17,6 +17,9 @@
package org.apache.dolphinscheduler.server.master.registry;
+import static
org.apache.dolphinscheduler.common.Constants.REGISTRY_DOLPHINSCHEDULER_MASTERS;
+import static
org.apache.dolphinscheduler.common.Constants.REGISTRY_DOLPHINSCHEDULER_WORKERS;
+
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.NodeType;
import org.apache.dolphinscheduler.common.utils.StringUtils;
@@ -131,11 +134,11 @@ public class ServerNodeManager implements
InitializingBean {
/**
* init MasterNodeListener listener
*/
- registryClient.subscribe(registryClient.getMasterPath(), new
MasterDataListener());
+ registryClient.subscribe(REGISTRY_DOLPHINSCHEDULER_MASTERS, new
MasterDataListener());
/**
* init WorkerNodeListener listener
*/
- registryClient.subscribe(registryClient.getWorkerPath(), new
MasterDataListener());
+ registryClient.subscribe(REGISTRY_DOLPHINSCHEDULER_WORKERS, new
MasterDataListener());
}
/**
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java
index 0720a9b..111262e 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java
@@ -22,7 +22,6 @@ import static
org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_D
import static
org.apache.dolphinscheduler.common.Constants.CMD_PARAM_RECOVERY_START_NODE_STRING;
import static
org.apache.dolphinscheduler.common.Constants.CMD_PARAM_START_NODE_NAMES;
import static
org.apache.dolphinscheduler.common.Constants.DEFAULT_WORKER_GROUP;
-import static org.apache.dolphinscheduler.common.Constants.LOCAL_PARAMS;
import static
org.apache.dolphinscheduler.common.Constants.SEC_2_MINUTES_TIME_UNIT;
import org.apache.dolphinscheduler.common.Constants;
@@ -47,7 +46,6 @@ import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.apache.dolphinscheduler.common.utils.ParameterUtils;
import org.apache.dolphinscheduler.common.utils.StringUtils;
-import org.apache.dolphinscheduler.common.utils.VarPoolUtils;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.ProjectUser;
import org.apache.dolphinscheduler.dao.entity.Schedule;
@@ -60,7 +58,6 @@ import
org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.dolphinscheduler.service.quartz.cron.CronUtils;
import org.apache.dolphinscheduler.service.queue.PeerTaskInstancePriorityQueue;
-import java.text.ParseException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
@@ -470,8 +467,6 @@ public class MasterExecThread implements Runnable {
* @return TaskInstance
*/
private TaskInstance createTaskInstance(ProcessInstance processInstance,
TaskNode taskNode) {
- //update processInstance for update the globalParams
- this.processInstance =
this.processService.findProcessInstanceById(this.processInstance.getId());
TaskInstance taskInstance = findTaskIfExists(taskNode.getCode(),
taskNode.getVersion());
if (taskInstance == null) {
taskInstance = new TaskInstance();
@@ -503,6 +498,9 @@ public class MasterExecThread implements Runnable {
// retry task instance interval
taskInstance.setRetryInterval(taskNode.getRetryInterval());
+ //set task param
+ taskInstance.setTaskParams(taskNode.getTaskParams());
+
// task instance priority
if (taskNode.getTaskInstancePriority() == null) {
taskInstance.setTaskInstancePriority(Priority.MEDIUM);
@@ -518,54 +516,74 @@ public class MasterExecThread implements Runnable {
} else {
taskInstance.setWorkerGroup(taskWorkerGroup);
}
-
taskInstance.setTaskParams(globalParamToTaskParams(taskNode.getTaskParams()));
// delay execution time
taskInstance.setDelayTime(taskNode.getDelayTime());
}
+
+ //get pre task ,get all the task varPool to this task
+ Set<String> preTask = dag.getPreviousNodes(taskInstance.getName());
+ getPreVarPool(taskInstance, preTask);
return taskInstance;
}
- private String globalParamToTaskParams(String params) {
- String globalParams = this.processInstance.getGlobalParams();
- if (StringUtils.isBlank(globalParams)) {
- return params;
- }
- Map<String, String> globalMap =
processService.getGlobalParamMap(globalParams);
- if (globalMap == null || globalMap.size() == 0) {
- return params;
- }
- // the process global param save in localParams
- Map<String, Object> result = JSONUtils.toMap(params, String.class,
Object.class);
- Object localParams = result.get(LOCAL_PARAMS);
- if (localParams != null) {
- List<Property> allParam =
JSONUtils.toList(JSONUtils.toJsonString(localParams), Property.class);
- for (Property info : allParam) {
- String paramName = info.getProp();
- if (StringUtils.isNotEmpty(paramName) &&
propToValue.containsKey(paramName)) {
- info.setValue((String) propToValue.get(paramName));
+ public void getPreVarPool(TaskInstance taskInstance, Set<String> preTask)
{
+ Map<String,Property> allProperty = new HashMap<>();
+ Map<String,TaskInstance> allTaskInstance = new HashMap<>();
+ if (CollectionUtils.isNotEmpty(preTask)) {
+ for (String preTaskName : preTask) {
+ TaskInstance preTaskInstance =
completeTaskList.get(preTaskName);
+ if (preTaskInstance == null) {
+ continue;
}
- if (info.getDirect().equals(Direct.IN)) {
- String value = globalMap.get(paramName);
- if (StringUtils.isNotEmpty(value)) {
- info.setValue(value);
+ String preVarPool = preTaskInstance.getVarPool();
+ if (StringUtils.isNotEmpty(preVarPool)) {
+ List<Property> properties = JSONUtils.toList(preVarPool,
Property.class);
+ for (Property info : properties) {
+ setVarPoolValue(allProperty, allTaskInstance,
preTaskInstance, info);
}
}
}
- result.put(LOCAL_PARAMS, allParam);
+ if (allProperty.size() > 0) {
+
taskInstance.setVarPool(JSONUtils.toJsonString(allProperty.values()));
+ }
+ }
+ }
+
+ private void setVarPoolValue(Map<String, Property> allProperty,
Map<String, TaskInstance> allTaskInstance, TaskInstance preTaskInstance,
Property thisProperty) {
+ //for this taskInstance all the param in this part is IN.
+ thisProperty.setDirect(Direct.IN);
+ //get the pre taskInstance Property's name
+ String proName = thisProperty.getProp();
+ //if the Previous nodes have the Property of same name
+ if (allProperty.containsKey(proName)) {
+ //comparison the value of two Property
+ Property otherPro = allProperty.get(proName);
+ //if this property'value of loop is empty,use the other,whether
the other's value is empty or not
+ if (StringUtils.isEmpty(thisProperty.getValue())) {
+ allProperty.put(proName, otherPro);
+ //if property'value of loop is not empty,and the other's
value is not empty too, use the earlier value
+ } else if (StringUtils.isNotEmpty(otherPro.getValue())) {
+ TaskInstance otherTask = allTaskInstance.get(proName);
+ if (otherTask.getEndTime().getTime() >
preTaskInstance.getEndTime().getTime()) {
+ allProperty.put(proName, thisProperty);
+ allTaskInstance.put(proName,preTaskInstance);
+ } else {
+ allProperty.put(proName, otherPro);
+ }
+ } else {
+ allProperty.put(proName, thisProperty);
+ allTaskInstance.put(proName,preTaskInstance);
+ }
+ } else {
+ allProperty.put(proName, thisProperty);
+ allTaskInstance.put(proName,preTaskInstance);
}
- return JSONUtils.toJsonString(result);
}
private void submitPostNode(String parentNodeName) {
Set<String> submitTaskNodeList =
DagHelper.parsePostNodes(parentNodeName, skipTaskNodeList, dag,
completeTaskList);
List<TaskInstance> taskInstances = new ArrayList<>();
for (String taskNode : submitTaskNodeList) {
- try {
- VarPoolUtils.convertVarPoolToMap(propToValue,
processInstance.getVarPool());
- } catch (ParseException e) {
- logger.error("parse {} exception",
processInstance.getVarPool(), e);
- throw new RuntimeException();
- }
TaskNode taskNodeObject = dag.getNode(taskNode);
taskInstances.add(createTaskInstance(processInstance,
taskNodeObject));
}
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/ParamUtils.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/ParamUtils.java
index 875c69c..a49d915 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/ParamUtils.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/ParamUtils.java
@@ -47,6 +47,7 @@ public class ParamUtils {
public static Map<String,Property> convert(Map<String,Property>
globalParams,
Map<String,String>
globalParamsMap,
Map<String,Property> localParams,
+
Map<String,Property> varParams,
CommandType
commandType,
Date scheduleTime) {
if (globalParams == null && localParams == null) {
@@ -64,10 +65,15 @@ public class ParamUtils {
}
if (globalParams != null && localParams != null) {
- globalParams.putAll(localParams);
+ localParams.putAll(globalParams);
+ globalParams = localParams;
} else if (globalParams == null && localParams != null) {
globalParams = localParams;
}
+ if (varParams != null) {
+ varParams.putAll(globalParams);
+ globalParams = varParams;
+ }
Iterator<Map.Entry<String, Property>> iter =
globalParams.entrySet().iterator();
while (iter.hasNext()) {
Map.Entry<String, Property> en = iter.next();
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClient.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClient.java
index 4db4d17..3b0dedb 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClient.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClient.java
@@ -18,6 +18,7 @@
package org.apache.dolphinscheduler.server.worker.registry;
import static
org.apache.dolphinscheduler.common.Constants.DEFAULT_WORKER_GROUP;
+import static
org.apache.dolphinscheduler.common.Constants.REGISTRY_DOLPHINSCHEDULER_WORKERS;
import static org.apache.dolphinscheduler.common.Constants.SLASH;
import org.apache.dolphinscheduler.common.Constants;
@@ -130,7 +131,7 @@ public class WorkerRegistryClient {
public Set<String> getWorkerZkPaths() {
Set<String> workerPaths = Sets.newHashSet();
String address = getLocalAddress();
- String workerZkPathPrefix = registryClient.getWorkerPath();
+ String workerZkPathPrefix = REGISTRY_DOLPHINSCHEDULER_WORKERS;
for (String workGroup : this.workerGroups) {
StringJoiner workerPathJoiner = new StringJoiner(SLASH);
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java
index 6fd4f34..50847f7 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java
@@ -151,10 +151,10 @@ public class TaskExecuteThread implements Runnable,
Delayed {
taskExecutionContext.getTaskInstanceId()));
task = TaskManager.newTask(taskExecutionContext, taskLogger,
alertClientService);
-
// task init
task.init();
-
+ //init varPool
+ task.getParameters().setVarPool(taskExecutionContext.getVarPool());
// task handle
task.handle();
@@ -165,8 +165,7 @@ public class TaskExecuteThread implements Runnable, Delayed
{
responseCommand.setEndTime(new Date());
responseCommand.setProcessId(task.getProcessId());
responseCommand.setAppIds(task.getAppIds());
- responseCommand.setVarPool(task.getVarPool());
- responseCommand.setResult(task.getResultString());
+
responseCommand.setVarPool(JSONUtils.toJsonString(task.getParameters().getVarPool()));
logger.info("task instance id : {},task final status : {}",
taskExecutionContext.getTaskInstanceId(), task.getExitStatus());
} catch (Exception e) {
logger.error("task scheduler failure", e);
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractCommandExecutor.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractCommandExecutor.java
index e408f11..3ea7bd2 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractCommandExecutor.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractCommandExecutor.java
@@ -89,11 +89,6 @@ public abstract class AbstractCommandExecutor {
protected boolean logOutputIsScuccess = false;
/**
- * SHELL result string
- */
- protected String taskResultString;
-
- /**
* taskExecutionContext
*/
protected TaskExecutionContext taskExecutionContext;
@@ -207,8 +202,8 @@ public abstract class AbstractCommandExecutor {
// waiting for the run to finish
boolean status = process.waitFor(remainTime, TimeUnit.SECONDS);
- logger.info("process has exited, execute path:{}, processId:{}
,exitStatusCode:{}",
- taskExecutionContext.getExecutePath(), processId,
result.getExitStatusCode());
+ logger.info("process has exited, execute path:{}, processId:{}
,exitStatusCode:{} ,processWaitForStatus:{} ,processExitValue:{}",
+ taskExecutionContext.getExecutePath(), processId,
result.getExitStatusCode(), status, process.exitValue());
// if SHELL task exit
if (status) {
@@ -224,7 +219,8 @@ public abstract class AbstractCommandExecutor {
result.setExitStatusCode(isSuccessOfYarnState(appIds) ?
EXIT_CODE_SUCCESS : EXIT_CODE_FAILURE);
}
} else {
- logger.error("process has failure , exitStatusCode : {} , ready to
kill ...", result.getExitStatusCode());
+ logger.error("process has failure , exitStatusCode:{},
processExitValue:{}, ready to kill ...",
+ result.getExitStatusCode(), process.exitValue());
ProcessUtils.kill(taskExecutionContext);
result.setExitStatusCode(EXIT_CODE_FAILURE);
}
@@ -364,7 +360,6 @@ public abstract class AbstractCommandExecutor {
varPool.append("$VarPool$");
} else {
logBuffer.add(line);
- taskResultString = line;
}
}
} catch (Exception e) {
@@ -592,11 +587,4 @@ public abstract class AbstractCommandExecutor {
protected abstract void createCommandFileIfNotExists(String execCommand,
String commandFile) throws IOException;
- public String getTaskResultString() {
- return taskResultString;
- }
-
- public void setTaskResultString(String taskResultString) {
- this.taskResultString = taskResultString;
- }
}
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractTask.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractTask.java
index 45b94d2..81b8097 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractTask.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractTask.java
@@ -36,11 +36,6 @@ import org.slf4j.Logger;
public abstract class AbstractTask {
/**
- * varPool string
- */
- protected String varPool;
-
- /**
* taskExecutionContext
**/
TaskExecutionContext taskExecutionContext;
@@ -57,11 +52,6 @@ public abstract class AbstractTask {
protected int processId;
/**
- * SHELL result string
- */
- protected String resultString;
-
- /**
* other resource manager appId , for example : YARN etc
*/
protected String appIds;
@@ -81,7 +71,7 @@ public abstract class AbstractTask {
* constructor
*
* @param taskExecutionContext taskExecutionContext
- * @param logger logger
+ * @param logger logger
*/
protected AbstractTask(TaskExecutionContext taskExecutionContext, Logger
logger) {
this.taskExecutionContext = taskExecutionContext;
@@ -139,14 +129,6 @@ public abstract class AbstractTask {
}
}
- public void setVarPool(String varPool) {
- this.varPool = varPool;
- }
-
- public String getVarPool() {
- return varPool;
- }
-
/**
* get exit status code
*
@@ -176,14 +158,6 @@ public abstract class AbstractTask {
this.processId = processId;
}
- public String getResultString() {
- return resultString;
- }
-
- public void setResultString(String resultString) {
- this.resultString = resultString;
- }
-
/**
* get task parameters
*
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/datax/DataxTask.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/datax/DataxTask.java
index a8aa132..b785cb5 100755
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/datax/DataxTask.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/datax/DataxTask.java
@@ -158,6 +158,7 @@ public class DataxTask extends AbstractTask {
Map<String, Property> paramsMap =
ParamUtils.convert(ParamUtils.getUserDefParamsMap(taskExecutionContext.getDefinedParams()),
taskExecutionContext.getDefinedParams(),
dataXParameters.getLocalParametersMap(),
+ dataXParameters.getVarPoolMap(),
CommandType.of(taskExecutionContext.getCmdTypeIfComplement()),
taskExecutionContext.getScheduleTime());
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/flink/FlinkTask.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/flink/FlinkTask.java
index 4d34190..27e5b42 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/flink/FlinkTask.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/flink/FlinkTask.java
@@ -84,6 +84,7 @@ public class FlinkTask extends AbstractYarnTask {
Map<String, Property> paramsMap =
ParamUtils.convert(ParamUtils.getUserDefParamsMap(taskExecutionContext.getDefinedParams()),
taskExecutionContext.getDefinedParams(),
flinkParameters.getLocalParametersMap(),
+ flinkParameters.getVarPoolMap(),
CommandType.of(taskExecutionContext.getCmdTypeIfComplement()),
taskExecutionContext.getScheduleTime());
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/http/HttpTask.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/http/HttpTask.java
index 87adaab..7c68bc1c 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/http/HttpTask.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/http/HttpTask.java
@@ -135,6 +135,7 @@ public class HttpTask extends AbstractTask {
Map<String, Property> paramsMap =
ParamUtils.convert(ParamUtils.getUserDefParamsMap(taskExecutionContext.getDefinedParams()),
taskExecutionContext.getDefinedParams(),
httpParameters.getLocalParametersMap(),
+ httpParameters.getVarPoolMap(),
CommandType.of(taskExecutionContext.getCmdTypeIfComplement()),
taskExecutionContext.getScheduleTime());
List<HttpProperty> httpPropertyList = new ArrayList<>();
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/mr/MapReduceTask.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/mr/MapReduceTask.java
index f60b1cb..ce908df 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/mr/MapReduceTask.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/mr/MapReduceTask.java
@@ -88,6 +88,7 @@ public class MapReduceTask extends AbstractYarnTask {
Map<String, Property> paramsMap =
ParamUtils.convert(ParamUtils.getUserDefParamsMap(taskExecutionContext.getDefinedParams()),
taskExecutionContext.getDefinedParams(),
mapreduceParameters.getLocalParametersMap(),
+ mapreduceParameters.getVarPoolMap(),
CommandType.of(taskExecutionContext.getCmdTypeIfComplement()),
taskExecutionContext.getScheduleTime());
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/procedure/ProcedureTask.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/procedure/ProcedureTask.java
index 2166b1f..3748c7a 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/procedure/ProcedureTask.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/procedure/ProcedureTask.java
@@ -122,6 +122,7 @@ public class ProcedureTask extends AbstractTask {
Map<String, Property> paramsMap =
ParamUtils.convert(ParamUtils.getUserDefParamsMap(taskExecutionContext.getDefinedParams()),
taskExecutionContext.getDefinedParams(),
procedureParameters.getLocalParametersMap(),
+ procedureParameters.getVarPoolMap(),
CommandType.of(taskExecutionContext.getCmdTypeIfComplement()),
taskExecutionContext.getScheduleTime());
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/python/PythonTask.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/python/PythonTask.java
index 6e561c1..e784a79 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/python/PythonTask.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/python/PythonTask.java
@@ -92,7 +92,7 @@ public class PythonTask extends AbstractTask {
setExitStatusCode(commandExecuteResult.getExitStatusCode());
setAppIds(commandExecuteResult.getAppIds());
setProcessId(commandExecuteResult.getProcessId());
- setVarPool(pythonCommandExecutor.getVarPool());
+ pythonParameters.dealOutParam(pythonCommandExecutor.getVarPool());
}
catch (Exception e) {
logger.error("python task failure", e);
@@ -119,6 +119,7 @@ public class PythonTask extends AbstractTask {
Map<String, Property> paramsMap =
ParamUtils.convert(ParamUtils.getUserDefParamsMap(taskExecutionContext.getDefinedParams()),
taskExecutionContext.getDefinedParams(),
pythonParameters.getLocalParametersMap(),
+ pythonParameters.getVarPoolMap(),
CommandType.of(taskExecutionContext.getCmdTypeIfComplement()),
taskExecutionContext.getScheduleTime());
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/shell/ShellTask.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/shell/ShellTask.java
index 85f8ea0..e193571 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/shell/ShellTask.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/shell/ShellTask.java
@@ -105,8 +105,7 @@ public class ShellTask extends AbstractTask {
setExitStatusCode(commandExecuteResult.getExitStatusCode());
setAppIds(commandExecuteResult.getAppIds());
setProcessId(commandExecuteResult.getProcessId());
- setResult(shellCommandExecutor.getTaskResultString());
- setVarPool(shellCommandExecutor.getVarPool());
+ shellParameters.dealOutParam(shellCommandExecutor.getVarPool());
} catch (Exception e) {
logger.error("shell task error", e);
setExitStatusCode(Constants.EXIT_CODE_FAILURE);
@@ -169,6 +168,7 @@ public class ShellTask extends AbstractTask {
Map<String, Property> paramsMap =
ParamUtils.convert(ParamUtils.getUserDefParamsMap(taskExecutionContext.getDefinedParams()),
taskExecutionContext.getDefinedParams(),
shellParameters.getLocalParametersMap(),
+ shellParameters.getVarPoolMap(),
CommandType.of(taskExecutionContext.getCmdTypeIfComplement()),
taskExecutionContext.getScheduleTime());
// replace variable TIME with $[YYYYmmddd...] in shell file when
history run job and batch complement job
@@ -188,17 +188,4 @@ public class ShellTask extends AbstractTask {
}
return ParameterUtils.convertParameterPlaceholders(script,
ParamUtils.convert(paramsMap));
}
-
- public void setResult(String result) {
- Map<String, Property> localParams =
shellParameters.getLocalParametersMap();
- List<Map<String, String>> outProperties = new ArrayList<>();
- Map<String, String> p = new HashMap<>();
- localParams.forEach((k,v) -> {
- if (v.getDirect() == Direct.OUT) {
- p.put(k, result);
- }
- });
- outProperties.add(p);
- resultString = JSONUtils.toJsonString(outProperties);
- }
}
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/spark/SparkTask.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/spark/SparkTask.java
index f6fec0f..a5a641c 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/spark/SparkTask.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/spark/SparkTask.java
@@ -113,6 +113,7 @@ public class SparkTask extends AbstractYarnTask {
Map<String, Property> paramsMap =
ParamUtils.convert(ParamUtils.getUserDefParamsMap(taskExecutionContext.getDefinedParams()),
taskExecutionContext.getDefinedParams(),
sparkParameters.getLocalParametersMap(),
+ sparkParameters.getVarPoolMap(),
CommandType.of(taskExecutionContext.getCmdTypeIfComplement()),
taskExecutionContext.getScheduleTime());
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sql/SqlTask.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sql/SqlTask.java
index 100f344..b174734 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sql/SqlTask.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sql/SqlTask.java
@@ -86,12 +86,6 @@ public class SqlTask extends AbstractTask {
*/
private TaskExecutionContext taskExecutionContext;
- /**
- * default query sql limit
- */
- private static final int LIMIT = 10000;
-
-
private AlertClientService alertClientService;
public SqlTask(TaskExecutionContext taskExecutionContext, Logger logger,
AlertClientService alertClientService) {
@@ -117,14 +111,16 @@ public class SqlTask extends AbstractTask {
Thread.currentThread().setName(threadLoggerInfoName);
logger.info("Full sql parameters: {}", sqlParameters);
- logger.info("sql type : {}, datasource : {}, sql : {} , localParams :
{},udfs : {},showType : {},connParams : {}",
+ logger.info("sql type : {}, datasource : {}, sql : {} , localParams :
{},udfs : {},showType : {},connParams : {},varPool : {} ,query max result limit
{}",
sqlParameters.getType(),
sqlParameters.getDatasource(),
sqlParameters.getSql(),
sqlParameters.getLocalParams(),
sqlParameters.getUdfs(),
sqlParameters.getShowType(),
- sqlParameters.getConnParams());
+ sqlParameters.getConnParams(),
+ sqlParameters.getVarPool(),
+ sqlParameters.getLimit());
try {
SQLTaskExecutionContext sqlTaskExecutionContext =
taskExecutionContext.getSqlTaskExecutionContext();
@@ -175,6 +171,7 @@ public class SqlTask extends AbstractTask {
Map<String, Property> paramsMap =
ParamUtils.convert(ParamUtils.getUserDefParamsMap(taskExecutionContext.getDefinedParams()),
taskExecutionContext.getDefinedParams(),
sqlParameters.getLocalParametersMap(),
+ sqlParameters.getVarPoolMap(),
CommandType.of(taskExecutionContext.getCmdTypeIfComplement()),
taskExecutionContext.getScheduleTime());
@@ -268,10 +265,9 @@ public class SqlTask extends AbstractTask {
String updateResult = String.valueOf(stmt.executeUpdate());
result = setNonQuerySqlReturn(updateResult,
sqlParameters.getLocalParams());
}
-
+ //deal out params
+ sqlParameters.dealOutParam(result);
postSql(connection, postStatementsBinds);
- this.setResultString(result);
-
} catch (Exception e) {
logger.error("execute sql error: {}", e.getMessage());
throw e;
@@ -280,6 +276,7 @@ public class SqlTask extends AbstractTask {
}
}
+
public String setNonQuerySqlReturn(String updateResult, List<Property>
properties) {
String result = null;
for (Property info :properties) {
@@ -309,7 +306,7 @@ public class SqlTask extends AbstractTask {
int rowCount = 0;
- while (rowCount < LIMIT && resultSet.next()) {
+ while (rowCount < sqlParameters.getLimit() && resultSet.next()) {
ObjectNode mapOfColValues = JSONUtils.createObjectNode();
for (int i = 1; i <= num; i++) {
mapOfColValues.set(md.getColumnLabel(i),
JSONUtils.toJsonNode(resultSet.getObject(i)));
@@ -326,12 +323,11 @@ public class SqlTask extends AbstractTask {
logger.info("row {} : {}", i + 1, row);
}
}
-
String result = JSONUtils.toJsonString(resultJSONArray);
if (sqlParameters.getSendEmail() == null ||
sqlParameters.getSendEmail()) {
sendAttachment(sqlParameters.getGroupId(),
StringUtils.isNotEmpty(sqlParameters.getTitle())
- ? sqlParameters.getTitle()
- : taskExecutionContext.getTaskName() + " query
result sets", result);
+ ? sqlParameters.getTitle()
+ : taskExecutionContext.getTaskName() + " query result
sets", result);
}
logger.debug("execute sql result : {}", result);
return result;
@@ -478,8 +474,16 @@ public class SqlTask extends AbstractTask {
String paramName = m.group(1);
Property prop = paramsPropsMap.get(paramName);
- sqlParamsMap.put(index, prop);
- index++;
+ if (prop == null) {
+ logger.error("setSqlParamsMap: No Property with paramName: {}
is found in paramsPropsMap of task instance"
+ + " with id: {}. So couldn't put Property in
sqlParamsMap.", paramName, taskExecutionContext.getTaskInstanceId());
+ }
+ else {
+ sqlParamsMap.put(index, prop);
+ index++;
+ logger.info("setSqlParamsMap: Property with paramName: {} put
in sqlParamsMap of content {} successfully.", paramName, content);
+ }
+
}
}
@@ -495,8 +499,13 @@ public class SqlTask extends AbstractTask {
//parameter print style
logger.info("after replace sql , preparing : {}", formatSql);
StringBuilder logPrint = new StringBuilder("replaced sql ,
parameters:");
- for (int i = 1; i <= sqlParamsMap.size(); i++) {
- logPrint.append(sqlParamsMap.get(i).getValue() + "(" +
sqlParamsMap.get(i).getType() + ")");
+ if (sqlParamsMap == null) {
+ logger.info("printReplacedSql: sqlParamsMap is null.");
+ }
+ else {
+ for (int i = 1; i <= sqlParamsMap.size(); i++) {
+ logPrint.append(sqlParamsMap.get(i).getValue() + "(" +
sqlParamsMap.get(i).getType() + ")");
+ }
}
logger.info("Sql Params are {}", logPrint);
}
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/SqoopTask.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/SqoopTask.java
index 00d94f0..1d1b32d 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/SqoopTask.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/SqoopTask.java
@@ -76,6 +76,7 @@ public class SqoopTask extends AbstractYarnTask {
Map<String, Property> paramsMap =
ParamUtils.convert(ParamUtils.getUserDefParamsMap(sqoopTaskExecutionContext.getDefinedParams()),
sqoopTaskExecutionContext.getDefinedParams(),
sqoopParameters.getLocalParametersMap(),
+ sqoopParameters.getVarPoolMap(),
CommandType.of(sqoopTaskExecutionContext.getCmdTypeIfComplement()),
sqoopTaskExecutionContext.getScheduleTime());
diff --git
a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/MasterExecThreadTest.java
b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/MasterExecThreadTest.java
index a42f187..fbc4ed8 100644
---
a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/MasterExecThreadTest.java
+++
b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/MasterExecThreadTest.java
@@ -44,10 +44,14 @@ import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.text.ParseException;
import java.util.Collections;
+import java.util.Date;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
import org.junit.Assert;
import org.junit.Before;
@@ -207,6 +211,50 @@ public class MasterExecThreadTest {
}
}
+ @Test
+ public void testGetPreVarPool() {
+ try {
+ Set<String> preTaskName = new HashSet<>();
+ preTaskName.add("test1");
+ preTaskName.add("test2");
+ Map<String, TaskInstance> completeTaskList = new
ConcurrentHashMap<>();
+
+ TaskInstance taskInstance = new TaskInstance();
+
+ TaskInstance taskInstance1 = new TaskInstance();
+ taskInstance1.setId(1);
+ taskInstance1.setName("test1");
+
taskInstance1.setVarPool("[{\"direct\":\"OUT\",\"prop\":\"test1\",\"type\":\"VARCHAR\",\"value\":\"1\"}]");
+ taskInstance1.setEndTime(new Date());
+
+ TaskInstance taskInstance2 = new TaskInstance();
+ taskInstance2.setId(2);
+ taskInstance2.setName("test2");
+
taskInstance2.setVarPool("[{\"direct\":\"OUT\",\"prop\":\"test2\",\"type\":\"VARCHAR\",\"value\":\"2\"}]");
+ taskInstance2.setEndTime(new Date());
+
+ completeTaskList.put("test1", taskInstance1);
+ completeTaskList.put("test2", taskInstance2);
+
+ Class<MasterExecThread> masterExecThreadClass =
MasterExecThread.class;
+
+ Field field =
masterExecThreadClass.getDeclaredField("completeTaskList");
+ field.setAccessible(true);
+ field.set(masterExecThread, completeTaskList);
+
+ masterExecThread.getPreVarPool(taskInstance, preTaskName);
+ Assert.assertNotNull(taskInstance.getVarPool());
+
taskInstance2.setVarPool("[{\"direct\":\"OUT\",\"prop\":\"test1\",\"type\":\"VARCHAR\",\"value\":\"2\"}]");
+ completeTaskList.put("test2", taskInstance2);
+ field.setAccessible(true);
+ field.set(masterExecThread, completeTaskList);
+ masterExecThread.getPreVarPool(taskInstance, preTaskName);
+ Assert.assertNotNull(taskInstance.getVarPool());
+ } catch (Exception e) {
+ Assert.fail();
+ }
+ }
+
private List<Schedule> zeroSchedulerList() {
return Collections.EMPTY_LIST;
}
diff --git
a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/ParamsTest.java
b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/ParamsTest.java
index 48b34d5..12613c6 100644
---
a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/ParamsTest.java
+++
b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/ParamsTest.java
@@ -20,20 +20,20 @@ import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.common.enums.DataType;
import org.apache.dolphinscheduler.common.enums.Direct;
import org.apache.dolphinscheduler.common.process.Property;
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.ParameterUtils;
import org.apache.dolphinscheduler.common.utils.placeholder.BusinessTimeUtils;
import org.apache.dolphinscheduler.server.utils.ParamUtils;
-import org.apache.dolphinscheduler.common.utils.*;
-
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.util.Calendar;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
/**
* user define param
@@ -73,20 +73,19 @@ public class ParamsTest {
}
@Test
- public void convertTest()throws Exception{
- Map<String,Property> globalParams = new HashMap<>();
+ public void convertTest() throws Exception {
+ Map<String, Property> globalParams = new HashMap<>();
Property property = new Property();
property.setProp("global_param");
property.setDirect(Direct.IN);
property.setType(DataType.VARCHAR);
property.setValue("${system.biz.date}");
- globalParams.put("global_param",property);
+ globalParams.put("global_param", property);
- Map<String,String> globalParamsMap = new HashMap<>();
- globalParamsMap.put("global_param","${system.biz.date}");
+ Map<String, String> globalParamsMap = new HashMap<>();
+ globalParamsMap.put("global_param", "${system.biz.date}");
-
- Map<String,Property> localParams = new HashMap<>();
+ Map<String, Property> localParams = new HashMap<>();
Property localProperty = new Property();
localProperty.setProp("local_param");
localProperty.setDirect(Direct.IN);
@@ -94,8 +93,16 @@ public class ParamsTest {
localProperty.setValue("${global_param}");
localParams.put("local_param", localProperty);
+ Map<String, Property> varPoolParams = new HashMap<>();
+ Property varProperty = new Property();
+ varProperty.setProp("local_param");
+ varProperty.setDirect(Direct.IN);
+ varProperty.setType(DataType.VARCHAR);
+ varProperty.setValue("${global_param}");
+ varPoolParams.put("varPool", varProperty);
+
Map<String, Property> paramsMap = ParamUtils.convert(globalParams,
globalParamsMap,
- localParams, CommandType.START_PROCESS, new Date());
+ localParams,varPoolParams, CommandType.START_PROCESS, new
Date());
logger.info(JSONUtils.toJsonString(paramsMap));
diff --git
a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseServiceTest.java
b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseServiceTest.java
index ec0807c..5d10f84 100644
---
a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseServiceTest.java
+++
b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseServiceTest.java
@@ -70,8 +70,7 @@ public class TaskResponseServiceTest {
"ids",
22,
"varPol",
- channel,
-
"[{\"id\":70000,\"database_name\":\"yuul\",\"status\":-1,\"create_time\":1601202829000,\"update_time\":1601202829000,\"table_name3\":\"\",\"table_name4\":\"\"}]");
+ channel);
taskInstance = new TaskInstance();
taskInstance.setId(22);
diff --git
a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/ParamUtilsTest.java
b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/ParamUtilsTest.java
index 220cce5..a9a1b89 100644
---
a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/ParamUtilsTest.java
+++
b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/ParamUtilsTest.java
@@ -17,23 +17,24 @@
package org.apache.dolphinscheduler.server.utils;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.common.enums.DataType;
import org.apache.dolphinscheduler.common.enums.Direct;
import org.apache.dolphinscheduler.common.process.Property;
-import org.apache.dolphinscheduler.common.utils.*;
-import org.junit.Before;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
import java.util.Calendar;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Test ParamUtils
@@ -49,8 +50,11 @@ public class ParamUtilsTest {
public Map<String, Property> localParams = new HashMap<>();
+ public Map<String, Property> varPoolParams = new HashMap<>();
+
/**
* Init params
+ *
* @throws Exception
*/
@Before
@@ -71,6 +75,14 @@ public class ParamUtilsTest {
localProperty.setType(DataType.VARCHAR);
localProperty.setValue("${global_param}");
localParams.put("local_param", localProperty);
+
+ Property varProperty = new Property();
+ varProperty.setProp("local_param");
+ varProperty.setDirect(Direct.IN);
+ varProperty.setType(DataType.VARCHAR);
+ varProperty.setValue("${global_param}");
+ varPoolParams.put("varPool", varProperty);
+
}
/**
@@ -80,16 +92,20 @@ public class ParamUtilsTest {
public void testConvert() {
//The expected value
- String expected =
"{\"global_param\":{\"prop\":\"global_param\",\"direct\":\"IN\",\"type\":\"VARCHAR\",\"value\":\"20191229\"},\"local_param\":{\"prop\":\"local_param\",\"direct\":\"IN\",\"type\":\"VARCHAR\",\"value\":\"20191229\"}}";
+ String expected =
"{\"varPool\":{\"prop\":\"local_param\",\"direct\":\"IN\",\"type\":\"VARCHAR\",\"value\":\"20191229\"},"
+ +
"\"global_param\":{\"prop\":\"global_param\",\"direct\":\"IN\",\"type\":\"VARCHAR\",\"value\":\"20191229\"},"
+ +
"\"local_param\":{\"prop\":\"local_param\",\"direct\":\"IN\",\"type\":\"VARCHAR\",\"value\":\"20191229\"}}";
//The expected value when globalParams is null but localParams is not
null
- String expected1 =
"{\"local_param\":{\"prop\":\"local_param\",\"direct\":\"IN\",\"type\":\"VARCHAR\",\"value\":\"20191229\"}}";
+ String expected1 =
"{\"varPool\":{\"prop\":\"local_param\",\"direct\":\"IN\",\"type\":\"VARCHAR\",\"value\":\"20191229\"},"
+ +
"\"global_param\":{\"prop\":\"global_param\",\"direct\":\"IN\",\"type\":\"VARCHAR\",\"value\":\"20191229\"},"
+ +
"\"local_param\":{\"prop\":\"local_param\",\"direct\":\"IN\",\"type\":\"VARCHAR\",\"value\":\"20191229\"}}";
//Define expected date , the month is 0-base
Calendar calendar = Calendar.getInstance();
- calendar.set(2019,11,30);
+ calendar.set(2019, 11, 30);
Date date = calendar.getTime();
//Invoke convert
- Map<String, Property> paramsMap = ParamUtils.convert(globalParams,
globalParamsMap, localParams, CommandType.START_PROCESS, date);
+ Map<String, Property> paramsMap = ParamUtils.convert(globalParams,
globalParamsMap, localParams, varPoolParams,CommandType.START_PROCESS, date);
String result = JSONUtils.toJsonString(paramsMap);
assertEquals(expected, result);
@@ -101,12 +117,12 @@ public class ParamUtilsTest {
}
//Invoke convert with null globalParams
- Map<String, Property> paramsMap1 = ParamUtils.convert(null,
globalParamsMap, localParams, CommandType.START_PROCESS, date);
+ Map<String, Property> paramsMap1 = ParamUtils.convert(null,
globalParamsMap, localParams,varPoolParams, CommandType.START_PROCESS, date);
String result1 = JSONUtils.toJsonString(paramsMap1);
assertEquals(expected1, result1);
//Null check, invoke convert with null globalParams and null
localParams
- Map<String, Property> paramsMap2 = ParamUtils.convert(null,
globalParamsMap, null, CommandType.START_PROCESS, date);
+ Map<String, Property> paramsMap2 = ParamUtils.convert(null,
globalParamsMap, null, varPoolParams,CommandType.START_PROCESS, date);
assertNull(paramsMap2);
}
diff --git
a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackServiceTest.java
b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackServiceTest.java
index c3f6478..53c60d7 100644
---
a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackServiceTest.java
+++
b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackServiceTest.java
@@ -67,8 +67,6 @@ public class TaskCallbackServiceTest {
taskCallbackService.sendAck(1, ackCommand.convert2Command());
TaskExecuteResponseCommand responseCommand = new
TaskExecuteResponseCommand();
- String result = responseCommand.getResult();
- responseCommand.setResult("return string");
taskCallbackService.sendResult(1, responseCommand.convert2Command());
Stopper.stop();
diff --git
a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessorTest.java
b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessorTest.java
index 36a758a..25fa22a 100644
---
a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessorTest.java
+++
b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessorTest.java
@@ -30,6 +30,7 @@ import
org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.utils.ProcessUtils;
import
org.apache.dolphinscheduler.server.worker.cache.impl.TaskExecutionContextCacheManagerImpl;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
+import org.apache.dolphinscheduler.server.worker.runner.WorkerManagerThread;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.service.log.LogClientService;
@@ -53,6 +54,8 @@ public class TaskKillProcessorTest {
private TaskKillProcessor taskKillProcessor;
+ private WorkerManagerThread workerManager;
+
private TaskExecutionContextCacheManagerImpl
taskExecutionContextCacheManager;
private Channel channel;
@@ -85,6 +88,8 @@ public class TaskKillProcessorTest {
PowerMockito.mockStatic(LoggerUtils.class);
PowerMockito.when(SpringApplicationContext.getBean(TaskCallbackService.class)).thenReturn(taskCallbackService);
PowerMockito.when(SpringApplicationContext.getBean(WorkerConfig.class)).thenReturn(workerConfig);
+ WorkerManagerThread workerManager =
PowerMockito.mock(WorkerManagerThread.class);
+
PowerMockito.when(SpringApplicationContext.getBean(WorkerManagerThread.class)).thenReturn(workerManager);
PowerMockito.when(SpringApplicationContext.getBean(TaskExecutionContextCacheManagerImpl.class)).thenReturn(taskExecutionContextCacheManager);
PowerMockito.doNothing().when(taskCallbackService).addRemoteChannel(anyInt(),
any());
PowerMockito.whenNew(NettyRemoteChannel.class).withAnyArguments().thenReturn(null);
@@ -102,7 +107,6 @@ public class TaskKillProcessorTest {
@Test
public void testProcess() {
-
PowerMockito.when(taskExecutionContextCacheManager.getByTaskInstanceId(1)).thenReturn(taskExecutionContext);
taskKillProcessor.process(channel, command);
diff --git
a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClientTest.java
b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClientTest.java
index b3517d3..bbc131d 100644
---
a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClientTest.java
+++
b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClientTest.java
@@ -71,8 +71,6 @@ public class WorkerRegistryClientTest {
@Before
public void before() {
-
- given(registryClient.getWorkerPath()).willReturn("/nodes/worker");
given(workerConfig.getWorkerGroups()).willReturn(Sets.newHashSet("127.0.0.1"));
//given(heartBeatExecutor.getWorkerGroups()).willReturn(Sets.newHashSet("127.0.0.1"));
//scheduleAtFixedRate
diff --git
a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThreadTest.java
b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThreadTest.java
index e176462..0c337e0 100644
---
a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThreadTest.java
+++
b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThreadTest.java
@@ -21,6 +21,7 @@ import
org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.enums.TaskType;
import org.apache.dolphinscheduler.common.model.TaskNode;
import org.apache.dolphinscheduler.common.task.AbstractParameters;
+import org.apache.dolphinscheduler.common.task.sql.SqlParameters;
import org.apache.dolphinscheduler.common.utils.CommonUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.LoggerUtils;
@@ -165,7 +166,7 @@ public class TaskExecuteThreadTest {
@Override
public AbstractParameters getParameters() {
- return null;
+ return new SqlParameters();
}
@Override
diff --git
a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/AbstractCommandExecutorTest.java
b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/AbstractCommandExecutorTest.java
deleted file mode 100644
index 348775c..0000000
---
a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/AbstractCommandExecutorTest.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.dolphinscheduler.server.worker.task;
-
-import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
-
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.modules.junit4.PowerMockRunner;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-@RunWith(PowerMockRunner.class)
-@PrepareForTest({SpringApplicationContext.class})
-public class AbstractCommandExecutorTest {
-
- private static final Logger logger =
LoggerFactory.getLogger(AbstractCommandExecutorTest.class);
-
- private ShellCommandExecutor shellCommandExecutor;
-
- @Before
- public void before() throws Exception {
- System.setProperty("log4j2.disable.jmx", Boolean.TRUE.toString());
- shellCommandExecutor = new ShellCommandExecutor(null);
- }
-
- @Test
- public void testSetTaskResultString() {
- shellCommandExecutor.setTaskResultString("shellReturn");
- }
-
- @Test
- public void testGetTaskResultString() {
- logger.info(shellCommandExecutor.getTaskResultString());
- }
-}
\ No newline at end of file
diff --git
a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/ShellTaskReturnTest.java
b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/ShellTaskReturnTest.java
index 892299c..574f0e7 100644
---
a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/ShellTaskReturnTest.java
+++
b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/ShellTaskReturnTest.java
@@ -110,17 +110,6 @@ public class ShellTaskReturnTest {
} catch (Exception e) {
e.printStackTrace();
}
- shellTask.setResult("shell return string");
- logger.info("shell return string:{}", shellTask.getResultString());
}
- @Test
- public void testSetTaskResultString() {
- shellCommandExecutor.setTaskResultString("shellReturn");
- }
-
- @Test
- public void testGetTaskResultString() {
- logger.info(shellCommandExecutor.getTaskResultString());
- }
}
diff --git
a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/TaskManagerTest.java
b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/TaskManagerTest.java
index 46d2713..cb8a189 100644
---
a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/TaskManagerTest.java
+++
b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/TaskManagerTest.java
@@ -164,11 +164,6 @@ public class TaskManagerTest {
definedParams.put("time_gb", "2020-12-16 00:00:00");
taskExecutionContext.setDefinedParams(definedParams);
ShellTask shellTask = (ShellTask)
TaskManager.newTask(taskExecutionContext, taskLogger, alertClientService);
- shellTask.setResultString("shell return");
- String shellReturn = shellTask.getResultString();
- shellTask.init();
- shellTask.setResult(shellReturn);
- Assert.assertSame(shellReturn, "shell return");
}
@Test
diff --git
a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/TaskParamsTest.java
b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/TaskParamsTest.java
new file mode 100644
index 0000000..f384f83
--- /dev/null
+++
b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/TaskParamsTest.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.worker.task;
+
+import static org.junit.Assert.assertNotNull;
+
+import org.apache.dolphinscheduler.common.enums.DataType;
+import org.apache.dolphinscheduler.common.enums.Direct;
+import org.apache.dolphinscheduler.common.process.Property;
+import org.apache.dolphinscheduler.common.task.shell.ShellParameters;
+import org.apache.dolphinscheduler.common.task.sql.SqlParameters;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.modules.junit4.PowerMockRunner;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * shell task return test.
+ */
+@RunWith(PowerMockRunner.class)
+public class TaskParamsTest {
+ private static final Logger logger =
LoggerFactory.getLogger(TaskParamsTest.class);
+
+ @Test
+ public void testDealOutParam() {
+ List<Property> properties = new ArrayList<>();
+ Property property = new Property();
+ property.setProp("test1");
+ property.setDirect(Direct.OUT);
+ property.setType(DataType.VARCHAR);
+ property.setValue("test1");
+ properties.add(property);
+
+ ShellParameters shellParameters = new ShellParameters();
+ String resultShell = "key1=value1$VarPoolkey2=value2";
+ shellParameters.varPool = new ArrayList<>();
+ shellParameters.setLocalParams(properties);
+ shellParameters.dealOutParam(resultShell);
+ assertNotNull(shellParameters.getVarPool().get(0));
+
+ String sqlResult =
"[{\"id\":6,\"test1\":\"6\"},{\"id\":70002,\"test1\":\"+1\"}]";
+ SqlParameters sqlParameters = new SqlParameters();
+ String sqlResult1 = "[{\"id\":6,\"test1\":\"6\"}]";
+ sqlParameters.setLocalParams(properties);
+ sqlParameters.varPool = new ArrayList<>();
+ sqlParameters.dealOutParam(sqlResult1);
+ assertNotNull(sqlParameters.getVarPool().get(0));
+
+ property.setType(DataType.LIST);
+ properties.clear();
+ properties.add(property);
+ sqlParameters.setLocalParams(properties);
+ sqlParameters.dealOutParam(sqlResult);
+ assertNotNull(sqlParameters.getVarPool().get(0));
+ }
+
+}
\ No newline at end of file
diff --git
a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/shell/ShellTaskTest.java
b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/shell/ShellTaskTest.java
index bd02f61..c992a0a 100644
---
a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/shell/ShellTaskTest.java
+++
b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/shell/ShellTaskTest.java
@@ -62,7 +62,6 @@ public class ShellTaskTest {
System.setProperty("log4j2.disable.jmx", Boolean.TRUE.toString());
shellCommandExecutor = PowerMockito.mock(ShellCommandExecutor.class);
PowerMockito.whenNew(ShellCommandExecutor.class).withAnyArguments().thenReturn(shellCommandExecutor);
- shellCommandExecutor.setTaskResultString("shellReturn");
taskExecutionContext = new TaskExecutionContext();
taskExecutionContext.setTaskInstanceId(1);
taskExecutionContext.setTaskName("kris test");
@@ -85,6 +84,7 @@ public class ShellTaskTest {
taskExecutionContext.setTenantCode("roo");
taskExecutionContext.setScheduleTime(new Date());
taskExecutionContext.setQueue("default");
+
taskExecutionContext.setVarPool("[{\"direct\":\"IN\",\"prop\":\"test\",\"type\":\"VARCHAR\",\"value\":\"\"}]");
taskExecutionContext.setTaskParams(
"{\"rawScript\":\"#!/bin/sh\\necho $[yyyy-MM-dd HH:mm:ss
+3]\\necho \\\" ?? ${time1} \\\"\\necho \\\" ?????
${time2}\\\"\\n\",\"localParams\":"
+
@@ -105,6 +105,7 @@ public class ShellTaskTest {
public void testComplementData() throws Exception {
shellTask = new ShellTask(taskExecutionContext, logger);
shellTask.init();
+
shellTask.getParameters().setVarPool(taskExecutionContext.getVarPool());
shellCommandExecutor.isSuccessOfYarnState(new ArrayList<>());
shellCommandExecutor.isSuccessOfYarnState(null);
PowerMockito.when(shellCommandExecutor.run(anyString())).thenReturn(commandExecuteResult);
@@ -116,16 +117,9 @@ public class ShellTaskTest {
taskExecutionContext.setCmdTypeIfComplement(0);
shellTask = new ShellTask(taskExecutionContext, logger);
shellTask.init();
+
shellTask.getParameters().setVarPool(taskExecutionContext.getVarPool());
PowerMockito.when(shellCommandExecutor.run(anyString())).thenReturn(commandExecuteResult);
shellTask.handle();
}
- @Test
- public void testSetResult() {
- shellTask = new ShellTask(taskExecutionContext, logger);
- shellTask.init();
- String r = "return";
- shellTask.setResult(r);
- }
-
}
diff --git
a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/sql/SqlTaskTest.java
b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/sql/SqlTaskTest.java
index 4dbcb2b..1266fd1 100644
---
a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/sql/SqlTaskTest.java
+++
b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/sql/SqlTaskTest.java
@@ -89,6 +89,7 @@ public class SqlTaskTest {
PowerMockito.when(taskExecutionContext.getStartTime()).thenReturn(new
Date());
PowerMockito.when(taskExecutionContext.getTaskTimeout()).thenReturn(10000);
PowerMockito.when(taskExecutionContext.getLogPath()).thenReturn("/tmp/dx");
+
PowerMockito.when(taskExecutionContext.getVarPool()).thenReturn("[{\"direct\":\"IN\",\"prop\":\"test\",\"type\":\"VARCHAR\",\"value\":\"\"}]");
SQLTaskExecutionContext sqlTaskExecutionContext = new
SQLTaskExecutionContext();
sqlTaskExecutionContext.setConnectionParams(CONNECTION_PARAMS);
@@ -98,6 +99,7 @@ public class SqlTaskTest {
PowerMockito.when(SpringApplicationContext.getBean(Mockito.any())).thenReturn(new
AlertDao());
alertClientService = PowerMockito.mock(AlertClientService.class);
sqlTask = new SqlTask(taskExecutionContext, logger,
alertClientService);
+ sqlTask.getParameters().setVarPool(taskExecutionContext.getVarPool());
sqlTask.init();
}
diff --git
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
index 69f3c16..ddead50 100644
---
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
+++
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
@@ -134,7 +134,6 @@ import
org.springframework.transaction.annotation.Transactional;
import com.cronutils.model.Cron;
import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
/**
@@ -1585,71 +1584,51 @@ public class ProcessService {
int processId,
String appIds,
int taskInstId,
- String varPool,
- String result) {
+ String varPool) {
taskInstance.setPid(processId);
taskInstance.setAppLink(appIds);
taskInstance.setState(state);
taskInstance.setEndTime(endTime);
taskInstance.setVarPool(varPool);
- changeOutParam(result, taskInstance);
+ changeOutParam(taskInstance);
saveTaskInstance(taskInstance);
}
- public void changeOutParam(String result, TaskInstance taskInstance) {
- if (StringUtils.isEmpty(result)) {
+ /**
+ * for show in page of taskInstance
+ * @param taskInstance
+ */
+ public void changeOutParam(TaskInstance taskInstance) {
+ if (StringUtils.isEmpty(taskInstance.getVarPool())) {
return;
}
- List<Map<String, String>> workerResultParam =
getListMapByString(result);
- if (CollectionUtils.isEmpty(workerResultParam)) {
+ List<Property> properties =
JSONUtils.toList(taskInstance.getVarPool(), Property.class);
+ if (CollectionUtils.isEmpty(properties)) {
return;
}
//if the result more than one line,just get the first .
- Map<String, String> row = workerResultParam.get(0);
- if (row == null || row.size() == 0) {
- return;
- }
Map<String, Object> taskParams =
JSONUtils.toMap(taskInstance.getTaskParams(), String.class, Object.class);
Object localParams = taskParams.get(LOCAL_PARAMS);
if (localParams == null) {
return;
}
- ProcessInstance processInstance =
this.processInstanceMapper.queryDetailById(taskInstance.getProcessInstanceId());
- List<Property> params4Property =
JSONUtils.toList(processInstance.getGlobalParams(), Property.class);
- Map<String, Property> allParamMap =
params4Property.stream().collect(Collectors.toMap(Property::getProp, Property
-> Property));
-
List<Property> allParam =
JSONUtils.toList(JSONUtils.toJsonString(localParams), Property.class);
+ Map<String, String> outProperty = new HashMap<>();
+ for (Property info : properties) {
+ if (info.getDirect() == Direct.OUT) {
+ outProperty.put(info.getProp(), info.getValue());
+ }
+ }
for (Property info : allParam) {
if (info.getDirect() == Direct.OUT) {
String paramName = info.getProp();
- Property property = allParamMap.get(paramName);
- if (property == null) {
- continue;
- }
- String value = String.valueOf(row.get(paramName));
- if (StringUtils.isNotEmpty(value)) {
- property.setValue(value);
- info.setValue(value);
- }
+ info.setValue(outProperty.get(paramName));
}
}
taskParams.put(LOCAL_PARAMS, allParam);
taskInstance.setTaskParams(JSONUtils.toJsonString(taskParams));
- String params4ProcessString = JSONUtils.toJsonString(params4Property);
- int updateCount =
this.processInstanceMapper.updateGlobalParamsById(params4ProcessString,
processInstance.getId());
- logger.info("updateCount:{}, params4Process:{}, processInstanceId:{}",
updateCount, params4ProcessString, processInstance.getId());
}
- public List<Map<String, String>> getListMapByString(String json) {
- List<Map<String, String>> allParams = new ArrayList<>();
- ArrayNode paramsByJson = JSONUtils.parseArray(json);
- Iterator<JsonNode> listIterator = paramsByJson.iterator();
- while (listIterator.hasNext()) {
- Map<String, String> param =
JSONUtils.toMap(listIterator.next().toString(), String.class, String.class);
- allParams.add(param);
- }
- return allParams;
- }
/**
* convert integer list to string list
diff --git
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/registry/RegistryCenter.java
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/registry/RegistryCenter.java
index 143821f..119a60a 100644
---
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/registry/RegistryCenter.java
+++
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/registry/RegistryCenter.java
@@ -18,6 +18,8 @@
package org.apache.dolphinscheduler.service.registry;
import static
org.apache.dolphinscheduler.common.Constants.REGISTRY_DOLPHINSCHEDULER_DEAD_SERVERS;
+import static
org.apache.dolphinscheduler.common.Constants.REGISTRY_DOLPHINSCHEDULER_MASTERS;
+import static
org.apache.dolphinscheduler.common.Constants.REGISTRY_DOLPHINSCHEDULER_WORKERS;
import org.apache.dolphinscheduler.common.IStoppable;
import org.apache.dolphinscheduler.common.utils.PropertyUtils;
@@ -57,16 +59,7 @@ public class RegistryCenter {
*/
protected static String NODES;
- /**
- * master path
- */
- protected static String MASTER_PATH = "/nodes/master";
-
private RegistryPluginManager registryPluginManager;
- /**
- * worker path
- */
- protected static String WORKER_PATH = "/nodes/worker";
protected static final String EMPTY = "";
@@ -113,8 +106,9 @@ public class RegistryCenter {
* init nodes
*/
private void initNodes() {
- persist(MASTER_PATH, EMPTY);
- persist(WORKER_PATH, EMPTY);
+ persist(REGISTRY_DOLPHINSCHEDULER_MASTERS, EMPTY);
+ persist(REGISTRY_DOLPHINSCHEDULER_WORKERS, EMPTY);
+ persist(REGISTRY_DOLPHINSCHEDULER_DEAD_SERVERS, EMPTY);
}
/**
@@ -206,31 +200,13 @@ public class RegistryCenter {
}
/**
- * get master path
- *
- * @return master path
- */
- public String getMasterPath() {
- return MASTER_PATH;
- }
-
- /**
* whether master path
*
* @param path path
* @return result
*/
public boolean isMasterPath(String path) {
- return path != null && path.contains(MASTER_PATH);
- }
-
- /**
- * get worker path
- *
- * @return worker path
- */
- public String getWorkerPath() {
- return WORKER_PATH;
+ return path != null &&
path.contains(REGISTRY_DOLPHINSCHEDULER_MASTERS);
}
/**
@@ -240,7 +216,7 @@ public class RegistryCenter {
* @return worker group path
*/
public String getWorkerGroupPath(String workerGroup) {
- return WORKER_PATH + "/" + workerGroup;
+ return REGISTRY_DOLPHINSCHEDULER_WORKERS + "/" + workerGroup;
}
/**
@@ -250,7 +226,7 @@ public class RegistryCenter {
* @return result
*/
public boolean isWorkerPath(String path) {
- return path != null && path.contains(WORKER_PATH);
+ return path != null &&
path.contains(REGISTRY_DOLPHINSCHEDULER_WORKERS);
}
/**
diff --git
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/registry/RegistryClient.java
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/registry/RegistryClient.java
index d7afcd9..d9ebf18 100644
---
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/registry/RegistryClient.java
+++
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/registry/RegistryClient.java
@@ -22,6 +22,8 @@ import static
org.apache.dolphinscheduler.common.Constants.COLON;
import static org.apache.dolphinscheduler.common.Constants.DELETE_OP;
import static org.apache.dolphinscheduler.common.Constants.DIVISION_STRING;
import static org.apache.dolphinscheduler.common.Constants.MASTER_TYPE;
+import static
org.apache.dolphinscheduler.common.Constants.REGISTRY_DOLPHINSCHEDULER_MASTERS;
+import static
org.apache.dolphinscheduler.common.Constants.REGISTRY_DOLPHINSCHEDULER_WORKERS;
import static org.apache.dolphinscheduler.common.Constants.SINGLE_SLASH;
import static org.apache.dolphinscheduler.common.Constants.UNDERLINE;
import static org.apache.dolphinscheduler.common.Constants.WORKER_TYPE;
@@ -344,7 +346,7 @@ public class RegistryClient extends RegistryCenter {
* @return master nodes
*/
public Set<String> getMasterNodesDirectly() {
- List<String> masters = getChildrenKeys(MASTER_PATH);
+ List<String> masters =
getChildrenKeys(REGISTRY_DOLPHINSCHEDULER_MASTERS);
return new HashSet<>(masters);
}
@@ -354,7 +356,7 @@ public class RegistryClient extends RegistryCenter {
* @return master nodes
*/
public Set<String> getWorkerNodesDirectly() {
- List<String> workers = getChildrenKeys(WORKER_PATH);
+ List<String> workers =
getChildrenKeys(REGISTRY_DOLPHINSCHEDULER_WORKERS);
return new HashSet<>(workers);
}
@@ -364,7 +366,7 @@ public class RegistryClient extends RegistryCenter {
* @return worker group nodes
*/
public Set<String> getWorkerGroupDirectly() {
- List<String> workers = getChildrenKeys(getWorkerPath());
+ List<String> workers =
getChildrenKeys(REGISTRY_DOLPHINSCHEDULER_WORKERS);
return new HashSet<>(workers);
}
diff --git
a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java
b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java
index 79be9ec..e00cf87 100644
---
a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java
+++
b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java
@@ -509,4 +509,19 @@ public class ProcessServiceTest {
Mockito.verify(commandMapper, Mockito.times(1)).insert(command);
}
+ @Test
+ public void testChangeOutParam() {
+ TaskInstance taskInstance = new TaskInstance();
+ taskInstance.setProcessInstanceId(62);
+ ProcessInstance processInstance = new ProcessInstance();
+ processInstance.setId(62);
+
taskInstance.setVarPool("[{\"direct\":\"OUT\",\"prop\":\"test1\",\"type\":\"VARCHAR\",\"value\":\"\"}]");
+
taskInstance.setTaskParams("{\"type\":\"MYSQL\",\"datasource\":1,\"sql\":\"select
id from tb_test limit 1\","
+ +
"\"udfs\":\"\",\"sqlType\":\"0\",\"sendEmail\":false,\"displayRows\":10,\"title\":\"\","
+ +
"\"groupId\":null,\"localParams\":[{\"prop\":\"test1\",\"direct\":\"OUT\",\"type\":\"VARCHAR\",\"value\":\"12\"}],"
+ +
"\"connParams\":\"\",\"preStatements\":[],\"postStatements\":[],\"conditionResult\":\"{\\\"successNode\\\":[\\\"\\\"],"
+ + "\\\"failedNode\\\":[\\\"\\\"]}\",\"dependence\":\"{}\"}");
+ processService.changeOutParam(taskInstance);
+ }
+
}
diff --git
a/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/plugin/DolphinPluginLoader.java
b/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/plugin/DolphinPluginLoader.java
index 5d2ad56..66244b2 100644
---
a/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/plugin/DolphinPluginLoader.java
+++
b/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/plugin/DolphinPluginLoader.java
@@ -104,7 +104,7 @@ public class DolphinPluginLoader {
private void loadPlugin(URLClassLoader pluginClassLoader) {
ServiceLoader<DolphinSchedulerPlugin> serviceLoader =
ServiceLoader.load(DolphinSchedulerPlugin.class, pluginClassLoader);
List<DolphinSchedulerPlugin> plugins =
ImmutableList.copyOf(serviceLoader);
- Preconditions.checkState(!plugins.isEmpty(), "No service providers the
plugin {}", DolphinSchedulerPlugin.class.getName());
+ Preconditions.checkState(!plugins.isEmpty(), "No service providers the
plugin %s", DolphinSchedulerPlugin.class.getName());
for (DolphinSchedulerPlugin plugin : plugins) {
logger.info("Installing {}", plugin.getClass().getName());
for (AbstractDolphinPluginManager dolphinPluginManager :
dolphinPluginManagerList) {
diff --git
a/dolphinscheduler-ui/src/js/conf/home/pages/security/pages/warningGroups/_source/createWarning.vue
b/dolphinscheduler-ui/src/js/conf/home/pages/security/pages/warningGroups/_source/createWarning.vue
index 2896093..a2edec8 100644
---
a/dolphinscheduler-ui/src/js/conf/home/pages/security/pages/warningGroups/_source/createWarning.vue
+++
b/dolphinscheduler-ui/src/js/conf/home/pages/security/pages/warningGroups/_source/createWarning.vue
@@ -109,6 +109,10 @@
this.$message.warning(`${i18n.$t('Please enter group name')}`)
return false
}
+ if (this.alertInstanceIds) {
+ this.$message.warning(`${i18n.$t('Select Alarm plugin instance')}`)
+ return false
+ }
return true
},
_submit () {
diff --git
a/dolphinscheduler-ui/src/js/conf/home/pages/security/pages/warningInstance/_source/createWarningInstance.vue
b/dolphinscheduler-ui/src/js/conf/home/pages/security/pages/warningInstance/_source/createWarningInstance.vue
index 07c9faa..6e09918e 100644
---
a/dolphinscheduler-ui/src/js/conf/home/pages/security/pages/warningInstance/_source/createWarningInstance.vue
+++
b/dolphinscheduler-ui/src/js/conf/home/pages/security/pages/warningInstance/_source/createWarningInstance.vue
@@ -111,6 +111,10 @@
this.$message.warning(`${i18n.$t('Please enter group name')}`)
return false
}
+ if (!this.pluginDefineId) {
+ this.$message.warning(`${i18n.$t('Select Alarm plugin')}`)
+ return false
+ }
return true
},
// Select plugin
diff --git
a/dolphinscheduler-ui/src/js/conf/home/pages/user/pages/account/_source/info.vue
b/dolphinscheduler-ui/src/js/conf/home/pages/user/pages/account/_source/info.vue
index ce9e1fb..5f30ed8 100644
---
a/dolphinscheduler-ui/src/js/conf/home/pages/user/pages/account/_source/info.vue
+++
b/dolphinscheduler-ui/src/js/conf/home/pages/user/pages/account/_source/info.vue
@@ -80,7 +80,7 @@
</div>
</template>
<script>
- import { mapState, mapMutations } from 'vuex'
+ import { mapActions, mapState, mapMutations } from 'vuex'
import mListBoxF from '@/module/components/listBoxF/listBoxF'
import mCreateUser from
'@/conf/home/pages/security/pages/users/_source/createUser'
@@ -95,6 +95,7 @@
props: {},
methods: {
...mapMutations('user', ['setUserInfo']),
+ ...mapActions('user', ['getUserInfo']),
/**
* edit
*/
@@ -109,7 +110,9 @@
email: param.email,
phone: param.phone
})
- this.createUserDialog = false
+ this.getUserInfo().finally(() => {
+ this.createUserDialog = false
+ })
},
close () {
diff --git a/dolphinscheduler-ui/src/js/conf/home/store/security/actions.js
b/dolphinscheduler-ui/src/js/conf/home/store/security/actions.js
index 87a96dd..eda5d02 100644
--- a/dolphinscheduler-ui/src/js/conf/home/store/security/actions.js
+++ b/dolphinscheduler-ui/src/js/conf/home/store/security/actions.js
@@ -446,7 +446,7 @@ export default {
*/
updateAlertPluginInstance ({ state }, payload) {
return new Promise((resolve, reject) => {
- io.get('alert-plugin-instance/update', payload, res => {
+ io.post('alert-plugin-instance/update', payload, res => {
resolve(res)
}).catch(e => {
reject(e)
diff --git a/dolphinscheduler-ui/src/js/module/i18n/locale/en_US.js
b/dolphinscheduler-ui/src/js/module/i18n/locale/en_US.js
index bc05c65..d3f0582 100755
--- a/dolphinscheduler-ui/src/js/module/i18n/locale/en_US.js
+++ b/dolphinscheduler-ui/src/js/module/i18n/locale/en_US.js
@@ -228,10 +228,12 @@ export default {
'Alarm instance name': 'Alarm instance name',
'Alarm plugin name': 'Alarm plugin name',
'Select plugin': 'Select plugin',
+ 'Select Alarm plugin': 'Please select an Alarm plugin',
'Please enter group name': 'Please enter group name',
'Instance parameter exception': 'Instance parameter exception',
'Group Type': 'Group Type',
'Alarm plugin instance': 'Alarm plugin instance',
+ 'Select Alarm plugin instance': 'Please select an Alarm plugin instance',
Remarks: 'Remarks',
SMS: 'SMS',
'Managing Users': 'Managing Users',
diff --git a/dolphinscheduler-ui/src/js/module/i18n/locale/zh_CN.js
b/dolphinscheduler-ui/src/js/module/i18n/locale/zh_CN.js
index 478f675..7065d73 100755
--- a/dolphinscheduler-ui/src/js/module/i18n/locale/zh_CN.js
+++ b/dolphinscheduler-ui/src/js/module/i18n/locale/zh_CN.js
@@ -228,10 +228,12 @@ export default {
'Alarm instance name': '告警实例名称',
'Alarm plugin name': '告警插件名称',
'Select plugin': '选择插件',
+ 'Select Alarm plugin': '请选择告警插件',
'Please enter group name': '请输入组名称',
'Instance parameter exception': '实例参数异常',
'Group Type': '组类型',
'Alarm plugin instance': '告警插件实例',
+ 'Select Alarm plugin instance': '请选择告警插件实例',
Remarks: '备注',
SMS: '短信',
'Managing Users': '管理用户',
diff --git a/pom.xml b/pom.xml
index 4f9fa3e..b4be3dd 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1010,8 +1010,8 @@
<include>**/server/worker/task/processdure/ProcedureTaskTest.java</include>
<include>**/server/worker/task/shell/ShellTaskTest.java</include>
<include>**/server/worker/task/TaskManagerTest.java</include>
-
<include>**/server/worker/task/AbstractCommandExecutorTest.java</include>
<include>**/server/worker/task/PythonCommandExecutorTest.java</include>
+
<include>**/server/worker/task/TaskParamsTest.java</include>
<include>**/server/worker/task/ShellTaskReturnTest.java</include>
<include>**/server/worker/task/sql/SqlTaskTest.java</include>
<include>**/server/worker/runner/TaskExecuteThreadTest.java</include>