This is an automated email from the ASF dual-hosted git repository.
wenjun pushed a commit to branch 3.2.2-prepare
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/3.2.2-prepare by this push:
new e65208b34e Fix task cannot use workflow's environment (#16199) (#16206)
e65208b34e is described below
commit e65208b34ec8d5cacfcd473969a1faa6d38cef27
Author: Wenjun Ruan <[email protected]>
AuthorDate: Mon Jun 24 23:00:18 2024 +0800
Fix task cannot use workflow's environment (#16199) (#16206)
(cherry picked from commit b34fe4604423ff209e42a6a65ac2e75770625143)
---
.../api/dto/task/TaskCreateRequest.java | 4 +-
.../api/service/WorkerGroupService.java | 8 ---
.../service/impl/ProcessDefinitionServiceImpl.java | 4 +-
.../ProjectWorkerGroupRelationServiceImpl.java | 3 +-
.../api/service/impl/WorkerGroupServiceImpl.java | 27 +-------
.../api/service/ExecuteFunctionServiceTest.java | 29 +++++----
.../api/service/ProcessDefinitionServiceTest.java | 3 +-
.../api/service/WorkerGroupServiceTest.java | 44 -------------
.../common/constants/Constants.java | 6 +-
.../dao/utils/EnvironmentUtils.java | 54 ++++++++++++++++
.../dao/utils/WorkerGroupUtils.java | 45 ++++++++++++++
.../dao/mapper/CommandMapperTest.java | 4 +-
.../dao/repository/impl/CommandDaoImplTest.java | 4 +-
.../dao/utils/EnvironmentUtilsTest.java | 72 ++++++++++++++++++++++
.../dao/utils/WorkerGroupUtilsTest.java | 70 +++++++++++++++++++++
.../e2e/cases/WorkflowJavaTaskE2ETest.java | 4 +-
.../master/dispatch/context/ExecutionContext.java | 57 -----------------
.../server/master/registry/ServerNodeManager.java | 10 ++-
.../master/runner/StreamTaskExecuteRunnable.java | 13 ++--
.../master/runner/WorkflowExecuteRunnable.java | 26 ++++----
.../master/utils/WorkflowInstanceUtilsTest.java | 3 +-
.../scheduler/quartz/ProcessScheduleTask.java | 7 +--
.../service/process/ProcessServiceImpl.java | 8 +--
23 files changed, 306 insertions(+), 199 deletions(-)
diff --git
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/task/TaskCreateRequest.java
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/task/TaskCreateRequest.java
index 1e651ee6e3..bb362962fa 100644
---
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/task/TaskCreateRequest.java
+++
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/task/TaskCreateRequest.java
@@ -19,11 +19,11 @@ package org.apache.dolphinscheduler.api.dto.task;
import static
org.apache.dolphinscheduler.common.constants.Constants.VERSION_FIRST;
-import org.apache.dolphinscheduler.common.constants.Constants;
import org.apache.dolphinscheduler.common.enums.Flag;
import org.apache.dolphinscheduler.common.enums.Priority;
import org.apache.dolphinscheduler.common.enums.TimeoutFlag;
import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
+import org.apache.dolphinscheduler.dao.utils.WorkerGroupUtils;
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskTimeoutStrategy;
import java.util.Date;
@@ -107,7 +107,7 @@ public class TaskCreateRequest {
taskDefinition.setProjectCode(this.projectCode);
taskDefinition.setTaskType(this.taskType);
taskDefinition.setTaskParams(this.taskParams);
- taskDefinition.setWorkerGroup(this.workerGroup == null ?
Constants.DEFAULT_WORKER_GROUP : this.workerGroup);
+
taskDefinition.setWorkerGroup(WorkerGroupUtils.getWorkerGroupOrDefault(workerGroup));
taskDefinition.setEnvironmentCode(this.environmentCode);
taskDefinition.setFailRetryTimes(this.failRetryTimes);
taskDefinition.setFailRetryInterval(this.failRetryInterval);
diff --git
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/WorkerGroupService.java
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/WorkerGroupService.java
index 2c87e4be2e..b85d3912cb 100644
---
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/WorkerGroupService.java
+++
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/WorkerGroupService.java
@@ -18,7 +18,6 @@
package org.apache.dolphinscheduler.api.service;
import org.apache.dolphinscheduler.api.utils.Result;
-import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.entity.User;
import java.util.List;
@@ -77,13 +76,6 @@ public interface WorkerGroupService {
*/
Map<String, Object> getWorkerAddressList();
- /**
- * Get task instance's worker group
- * @param taskInstance task instance
- * @return worker group
- */
- String getTaskWorkerGroup(TaskInstance taskInstance);
-
/**
* Query worker group by process definition codes
* @param processDefinitionCodeList processDefinitionCodeList
diff --git
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
index e6893b04fb..6a4251cef1 100644
---
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
+++
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
@@ -34,7 +34,6 @@ import static
org.apache.dolphinscheduler.api.enums.Status.PROCESS_DEFINE_NOT_EX
import static
org.apache.dolphinscheduler.common.constants.CommandKeyConstants.CMD_PARAM_SUB_PROCESS_DEFINE_CODE;
import static
org.apache.dolphinscheduler.common.constants.Constants.COPY_SUFFIX;
import static org.apache.dolphinscheduler.common.constants.Constants.DATA_LIST;
-import static
org.apache.dolphinscheduler.common.constants.Constants.DEFAULT_WORKER_GROUP;
import static
org.apache.dolphinscheduler.common.constants.Constants.GLOBAL_PARAMS;
import static
org.apache.dolphinscheduler.common.constants.Constants.IMPORT_SUFFIX;
import static
org.apache.dolphinscheduler.common.constants.Constants.LOCAL_PARAMS;
@@ -109,6 +108,7 @@ import
org.apache.dolphinscheduler.dao.model.PageListingResult;
import org.apache.dolphinscheduler.dao.repository.ProcessDefinitionDao;
import org.apache.dolphinscheduler.dao.repository.ProcessDefinitionLogDao;
import org.apache.dolphinscheduler.dao.repository.TaskDefinitionLogDao;
+import org.apache.dolphinscheduler.dao.utils.WorkerGroupUtils;
import org.apache.dolphinscheduler.plugin.task.api.TaskPluginManager;
import org.apache.dolphinscheduler.plugin.task.api.enums.SqlType;
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskTimeoutStrategy;
@@ -1390,7 +1390,7 @@ public class ProcessDefinitionServiceImpl extends
BaseServiceImpl implements Pro
taskDefinition.setFailRetryTimes(0);
taskDefinition.setFailRetryInterval(0);
taskDefinition.setTimeoutFlag(TimeoutFlag.CLOSE);
- taskDefinition.setWorkerGroup(DEFAULT_WORKER_GROUP);
+
taskDefinition.setWorkerGroup(WorkerGroupUtils.getDefaultWorkerGroup());
taskDefinition.setTaskPriority(Priority.MEDIUM);
taskDefinition.setEnvironmentCode(-1);
taskDefinition.setTimeout(0);
diff --git
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProjectWorkerGroupRelationServiceImpl.java
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProjectWorkerGroupRelationServiceImpl.java
index 31dc113dbb..c11915667f 100644
---
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProjectWorkerGroupRelationServiceImpl.java
+++
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProjectWorkerGroupRelationServiceImpl.java
@@ -32,6 +32,7 @@ import
org.apache.dolphinscheduler.dao.mapper.ProjectWorkerGroupMapper;
import org.apache.dolphinscheduler.dao.mapper.ScheduleMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper;
import org.apache.dolphinscheduler.dao.mapper.WorkerGroupMapper;
+import org.apache.dolphinscheduler.dao.utils.WorkerGroupUtils;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.collections4.SetUtils;
@@ -118,7 +119,7 @@ public class ProjectWorkerGroupRelationServiceImpl extends
BaseServiceImpl
workerGroupMapper.queryAllWorkerGroup().stream().map(WorkerGroup::getName).collect(
Collectors.toSet());
- workerGroupNames.add(Constants.DEFAULT_WORKER_GROUP);
+ workerGroupNames.add(WorkerGroupUtils.getDefaultWorkerGroup());
Set<String> assignedWorkerGroupNames = new HashSet<>(workerGroups);
diff --git
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkerGroupServiceImpl.java
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkerGroupServiceImpl.java
index 9c98880740..4652347987 100644
---
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkerGroupServiceImpl.java
+++
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkerGroupServiceImpl.java
@@ -32,7 +32,6 @@ import
org.apache.dolphinscheduler.dao.entity.EnvironmentWorkerGroupRelation;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.Schedule;
import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
-import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.entity.WorkerGroup;
import
org.apache.dolphinscheduler.dao.mapper.EnvironmentWorkerGroupRelationMapper;
@@ -41,6 +40,7 @@ import
org.apache.dolphinscheduler.dao.mapper.ProcessInstanceMapper;
import org.apache.dolphinscheduler.dao.mapper.ScheduleMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper;
import org.apache.dolphinscheduler.dao.mapper.WorkerGroupMapper;
+import org.apache.dolphinscheduler.dao.utils.WorkerGroupUtils;
import org.apache.dolphinscheduler.registry.api.RegistryClient;
import org.apache.dolphinscheduler.registry.api.enums.RegistryNodeType;
import org.apache.dolphinscheduler.service.process.ProcessService;
@@ -357,12 +357,12 @@ public class WorkerGroupServiceImpl extends
BaseServiceImpl implements WorkerGro
workerGroups = workerGroupMapper.queryAllWorkerGroup();
}
boolean containDefaultWorkerGroups = workerGroups.stream()
- .anyMatch(workerGroup ->
Constants.DEFAULT_WORKER_GROUP.equals(workerGroup.getName()));
+ .anyMatch(workerGroup ->
WorkerGroupUtils.isWorkerGroupEmpty(workerGroup.getName()));
if (!containDefaultWorkerGroups) {
// there doesn't exist a default WorkerGroup, we will add all
worker to the default worker group.
Set<String> activeWorkerNodes =
registryClient.getServerNodeSet(RegistryNodeType.WORKER);
WorkerGroup defaultWorkerGroup = new WorkerGroup();
- defaultWorkerGroup.setName(Constants.DEFAULT_WORKER_GROUP);
+
defaultWorkerGroup.setName(WorkerGroupUtils.getDefaultWorkerGroup());
defaultWorkerGroup.setAddrList(String.join(Constants.COMMA,
activeWorkerNodes));
defaultWorkerGroup.setCreateTime(new Date());
defaultWorkerGroup.setUpdateTime(new Date());
@@ -431,27 +431,6 @@ public class WorkerGroupServiceImpl extends
BaseServiceImpl implements WorkerGro
return result;
}
- @Override
- public String getTaskWorkerGroup(TaskInstance taskInstance) {
- if (taskInstance == null) {
- return null;
- }
-
- String workerGroup = taskInstance.getWorkerGroup();
-
- if (StringUtils.isNotEmpty(workerGroup)) {
- return workerGroup;
- }
- int processInstanceId = taskInstance.getProcessInstanceId();
- ProcessInstance processInstance =
processService.findProcessInstanceById(processInstanceId);
-
- if (processInstance != null) {
- return processInstance.getWorkerGroup();
- }
- log.info("task : {} will use default worker group",
taskInstance.getId());
- return Constants.DEFAULT_WORKER_GROUP;
- }
-
@Override
public Map<Long, String>
queryWorkerGroupByProcessDefinitionCodes(List<Long> processDefinitionCodeList) {
List<Schedule> processDefinitionScheduleList =
diff --git
a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecuteFunctionServiceTest.java
b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecuteFunctionServiceTest.java
index 8f1869c1fb..04dabf5ead 100644
---
a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecuteFunctionServiceTest.java
+++
b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecuteFunctionServiceTest.java
@@ -67,6 +67,7 @@ import
org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskGroupQueueMapper;
import org.apache.dolphinscheduler.dao.mapper.TenantMapper;
import org.apache.dolphinscheduler.dao.repository.ProcessInstanceDao;
+import org.apache.dolphinscheduler.dao.utils.WorkerGroupUtils;
import org.apache.dolphinscheduler.registry.api.enums.RegistryNodeType;
import org.apache.dolphinscheduler.service.command.CommandService;
import org.apache.dolphinscheduler.service.process.ProcessService;
@@ -271,7 +272,7 @@ public class ExecuteFunctionServiceTest {
null, null,
null, null, null,
RunMode.RUN_MODE_SERIAL,
- Priority.LOW, Constants.DEFAULT_WORKER_GROUP, tenantCode,
100L, 10, null, null,
+ Priority.LOW, WorkerGroupUtils.getDefaultWorkerGroup(),
tenantCode, 100L, 10, null, null,
Constants.DRY_RUN_FLAG_NO,
Constants.TEST_FLAG_NO,
ComplementDependentMode.OFF_MODE, null,
@@ -298,7 +299,7 @@ public class ExecuteFunctionServiceTest {
null, "123456789,987654321",
null, null, null,
RunMode.RUN_MODE_SERIAL,
- Priority.LOW, Constants.DEFAULT_WORKER_GROUP, tenantCode,
100L, 110, null, null,
+ Priority.LOW, WorkerGroupUtils.getDefaultWorkerGroup(),
tenantCode, 100L, 110, null, null,
Constants.DRY_RUN_FLAG_NO,
Constants.TEST_FLAG_NO,
ComplementDependentMode.OFF_MODE, null,
@@ -323,7 +324,7 @@ public class ExecuteFunctionServiceTest {
null, "1123456789,987654321",
null, null, null,
RunMode.RUN_MODE_SERIAL,
- Priority.LOW, Constants.DEFAULT_WORKER_GROUP, tenantCode,
100L, 110, null, 0,
+ Priority.LOW, WorkerGroupUtils.getDefaultWorkerGroup(),
tenantCode, 100L, 110, null, 0,
Constants.DRY_RUN_FLAG_NO,
Constants.TEST_FLAG_NO,
ComplementDependentMode.OFF_MODE, null,
@@ -354,14 +355,14 @@ public class ExecuteFunctionServiceTest {
dependentProcessDefinition.setProcessDefinitionCode(2);
dependentProcessDefinition.setProcessDefinitionVersion(1);
dependentProcessDefinition.setTaskDefinitionCode(1);
-
dependentProcessDefinition.setWorkerGroup(Constants.DEFAULT_WORKER_GROUP);
+
dependentProcessDefinition.setWorkerGroup(WorkerGroupUtils.getDefaultWorkerGroup());
dependentProcessDefinition.setTaskParams(
"{\"localParams\":[],\"resourceList\":[],\"dependence\":{\"relation\":\"AND\",\"dependTaskList\":[{\"relation\":\"AND\",\"dependItemList\":[{\"depTaskCode\":2,\"status\":\"SUCCESS\"}]}]},\"conditionResult\":{\"successNode\":[1],\"failedNode\":[1]}}");
Mockito.when(processService.queryDependentProcessDefinitionByProcessDefinitionCode(processDefinitionCode))
.thenReturn(Lists.newArrayList(dependentProcessDefinition));
Map<Long, String> processDefinitionWorkerGroupMap = new HashMap<>();
- processDefinitionWorkerGroupMap.put(1L,
Constants.DEFAULT_WORKER_GROUP);
+ processDefinitionWorkerGroupMap.put(1L,
WorkerGroupUtils.getDefaultWorkerGroup());
Mockito.when(workerGroupService.queryWorkerGroupByProcessDefinitionCodes(Lists.newArrayList(1L)))
.thenReturn(processDefinitionWorkerGroupMap);
@@ -370,7 +371,7 @@ public class ExecuteFunctionServiceTest {
command.setCommandType(CommandType.COMPLEMENT_DATA);
command.setCommandParam(
"{\"StartNodeList\":\"1\",\"complementStartDate\":\"2020-01-01
00:00:00\",\"complementEndDate\":\"2020-01-31 23:00:00\"}");
- command.setWorkerGroup(Constants.DEFAULT_WORKER_GROUP);
+ command.setWorkerGroup(WorkerGroupUtils.getDefaultWorkerGroup());
command.setProcessDefinitionCode(processDefinitionCode);
command.setExecutorId(1);
@@ -383,7 +384,7 @@ public class ExecuteFunctionServiceTest {
childDependent.setProcessDefinitionCode(3);
childDependent.setProcessDefinitionVersion(1);
childDependent.setTaskDefinitionCode(4);
- childDependent.setWorkerGroup(Constants.DEFAULT_WORKER_GROUP);
+
childDependent.setWorkerGroup(WorkerGroupUtils.getDefaultWorkerGroup());
childDependent.setTaskParams(
"{\"localParams\":[],\"resourceList\":[],\"dependence\":{\"relation\":\"AND\",\"dependTaskList\":[{\"relation\":\"AND\",\"dependItemList\":[{\"depTaskCode\":3,\"status\":\"SUCCESS\"}]}]},\"conditionResult\":{\"successNode\":[1],\"failedNode\":[1]}}");
Mockito.when(processService.queryDependentProcessDefinitionByProcessDefinitionCode(
@@ -409,7 +410,8 @@ public class ExecuteFunctionServiceTest {
null, null,
null, null, null,
RunMode.RUN_MODE_SERIAL,
- Priority.LOW, Constants.DEFAULT_WORKER_GROUP, tenantCode,
100L, 110, null, 2, Constants.DRY_RUN_FLAG_NO,
+ Priority.LOW, WorkerGroupUtils.getDefaultWorkerGroup(),
tenantCode, 100L, 110, null, 2,
+ Constants.DRY_RUN_FLAG_NO,
Constants.TEST_FLAG_NO,
ComplementDependentMode.OFF_MODE, null,
false,
@@ -434,7 +436,7 @@ public class ExecuteFunctionServiceTest {
null, null,
null, null, null,
RunMode.RUN_MODE_SERIAL,
- Priority.LOW, Constants.DEFAULT_WORKER_GROUP, tenantCode,
100L, 110, null, null,
+ Priority.LOW, WorkerGroupUtils.getDefaultWorkerGroup(),
tenantCode, 100L, 110, null, null,
Constants.DRY_RUN_FLAG_NO,
Constants.TEST_FLAG_NO,
ComplementDependentMode.OFF_MODE, null,
@@ -460,7 +462,8 @@ public class ExecuteFunctionServiceTest {
null, null,
null, null, null,
RunMode.RUN_MODE_PARALLEL,
- Priority.LOW, Constants.DEFAULT_WORKER_GROUP, tenantCode,
100L, 110, null, 2, Constants.DRY_RUN_FLAG_NO,
+ Priority.LOW, WorkerGroupUtils.getDefaultWorkerGroup(),
tenantCode, 100L, 110, null, 2,
+ Constants.DRY_RUN_FLAG_NO,
Constants.TEST_FLAG_NO,
ComplementDependentMode.OFF_MODE, null,
false,
@@ -486,7 +489,7 @@ public class ExecuteFunctionServiceTest {
null, null,
null, null, null,
RunMode.RUN_MODE_PARALLEL,
- Priority.LOW, Constants.DEFAULT_WORKER_GROUP, tenantCode,
100L, 110, null, 15,
+ Priority.LOW, WorkerGroupUtils.getDefaultWorkerGroup(),
tenantCode, 100L, 110, null, 15,
Constants.DRY_RUN_FLAG_NO,
Constants.TEST_FLAG_NO,
ComplementDependentMode.OFF_MODE, null,
@@ -514,7 +517,7 @@ public class ExecuteFunctionServiceTest {
null,
RunMode.RUN_MODE_PARALLEL,
Priority.LOW,
- Constants.DEFAULT_WORKER_GROUP,
+ WorkerGroupUtils.getDefaultWorkerGroup(),
tenantCode,
100L,
110,
@@ -553,7 +556,7 @@ public class ExecuteFunctionServiceTest {
null, null,
null, null, 0,
RunMode.RUN_MODE_PARALLEL,
- Priority.LOW, Constants.DEFAULT_WORKER_GROUP, tenantCode,
100L, 110, null, 15,
+ Priority.LOW, WorkerGroupUtils.getDefaultWorkerGroup(),
tenantCode, 100L, 110, null, 15,
Constants.DRY_RUN_FLAG_NO,
Constants.TEST_FLAG_YES,
ComplementDependentMode.OFF_MODE, null,
diff --git
a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java
b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java
index ed3a5f639b..37af53dbbd 100644
---
a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java
+++
b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java
@@ -72,6 +72,7 @@ import
org.apache.dolphinscheduler.dao.model.PageListingResult;
import org.apache.dolphinscheduler.dao.repository.ProcessDefinitionDao;
import org.apache.dolphinscheduler.dao.repository.ProcessDefinitionLogDao;
import org.apache.dolphinscheduler.dao.repository.TaskDefinitionLogDao;
+import org.apache.dolphinscheduler.dao.utils.WorkerGroupUtils;
import org.apache.dolphinscheduler.service.alert.ListenerEventAlertManager;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.dolphinscheduler.spi.enums.DbType;
@@ -1143,7 +1144,7 @@ public class ProcessDefinitionServiceTest extends
BaseServiceTestTool {
schedule.setProcessInstancePriority(Priority.MEDIUM);
schedule.setWarningType(WarningType.NONE);
schedule.setWarningGroupId(1);
- schedule.setWorkerGroup(Constants.DEFAULT_WORKER_GROUP);
+ schedule.setWorkerGroup(WorkerGroupUtils.getDefaultWorkerGroup());
return schedule;
}
diff --git
a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/WorkerGroupServiceTest.java
b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/WorkerGroupServiceTest.java
index 08a541c5bb..fce9aa3f1c 100644
---
a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/WorkerGroupServiceTest.java
+++
b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/WorkerGroupServiceTest.java
@@ -30,7 +30,6 @@ import org.apache.dolphinscheduler.common.constants.Constants;
import org.apache.dolphinscheduler.common.enums.AuthorizationType;
import org.apache.dolphinscheduler.common.enums.UserType;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
-import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.entity.WorkerGroup;
import
org.apache.dolphinscheduler.dao.mapper.EnvironmentWorkerGroupRelationMapper;
@@ -65,8 +64,6 @@ import org.slf4j.LoggerFactory;
@MockitoSettings(strictness = Strictness.LENIENT)
public class WorkerGroupServiceTest {
- private static final Logger logger =
LoggerFactory.getLogger(WorkerGroupServiceTest.class);
-
private static final Logger baseServiceLogger =
LoggerFactory.getLogger(BaseServiceImpl.class);
private static final Logger serviceLogger =
LoggerFactory.getLogger(WorkerGroupService.class);
@@ -288,47 +285,6 @@ public class WorkerGroupServiceTest {
Assertions.assertEquals("default", workerGroups.toArray()[0]);
}
- @Test
- public void giveNull_whenGetTaskWorkerGroup_expectNull() {
- String nullWorkerGroup = workerGroupService.getTaskWorkerGroup(null);
- Assertions.assertNull(nullWorkerGroup);
- }
-
- @Test
- public void
giveCorrectTaskInstance_whenGetTaskWorkerGroup_expectTaskWorkerGroup() {
- TaskInstance taskInstance = new TaskInstance();
- taskInstance.setId(1);
- taskInstance.setWorkerGroup("cluster1");
-
- String workerGroup =
workerGroupService.getTaskWorkerGroup(taskInstance);
- Assertions.assertEquals("cluster1", workerGroup);
- }
-
- @Test
- public void
giveNullWorkerGroup_whenGetTaskWorkerGroup_expectProcessWorkerGroup() {
- TaskInstance taskInstance = new TaskInstance();
- taskInstance.setId(1);
- taskInstance.setProcessInstanceId(1);
- ProcessInstance processInstance = new ProcessInstance();
- processInstance.setId(1);
- processInstance.setWorkerGroup("cluster1");
-
Mockito.when(processService.findProcessInstanceById(1)).thenReturn(processInstance);
-
- String workerGroup =
workerGroupService.getTaskWorkerGroup(taskInstance);
- Assertions.assertEquals("cluster1", workerGroup);
- }
-
- @Test
- public void
giveNullTaskAndProcessWorkerGroup_whenGetTaskWorkerGroup_expectDefault() {
- TaskInstance taskInstance = new TaskInstance();
- taskInstance.setId(1);
- taskInstance.setProcessInstanceId(1);
-
Mockito.when(processService.findProcessInstanceById(1)).thenReturn(null);
-
- String defaultWorkerGroup =
workerGroupService.getTaskWorkerGroup(taskInstance);
- Assertions.assertEquals(Constants.DEFAULT_WORKER_GROUP,
defaultWorkerGroup);
- }
-
/**
* get Group
*/
diff --git
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/constants/Constants.java
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/constants/Constants.java
index 5c56818fa0..c74c7c7bbc 100644
---
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/constants/Constants.java
+++
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/constants/Constants.java
@@ -47,6 +47,7 @@ public final class Constants {
public static final String RESOURCE_TYPE_UDF = "udfs";
public static final String EMPTY_STRING = "";
+ public static final int MAX_FILE_SIZE = 1024 * 1024 * 1024;
/**
* resource.hdfs.fs.defaultFS
@@ -532,14 +533,9 @@ public final class Constants {
* session timeout
*/
public static final int SESSION_TIME_OUT = 7200;
- public static final int MAX_FILE_SIZE = 1024 * 1024 * 1024;
public static final String UDF = "UDF";
public static final String CLASS = "class";
- /**
- * default worker group
- */
- public static final String DEFAULT_WORKER_GROUP = "default";
/**
* authorize writable perm
*/
diff --git
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/EnvironmentUtils.java
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/EnvironmentUtils.java
new file mode 100644
index 0000000000..89ea647f75
--- /dev/null
+++
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/EnvironmentUtils.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.dao.utils;
+
+public class EnvironmentUtils {
+
+ private static final long EMPTY_ENVIRONMENT_CODE = -1L;
+
+ /**
+ * Check if the environment code is empty (we should use null instead of
-1, this is used to comply with the original code)
+ *
+ * @return true if the environment code is empty, false otherwise
+ */
+ public static boolean isEnvironmentCodeEmpty(Long environmentCode) {
+ return environmentCode == null || environmentCode <= 0;
+ }
+
+ /**
+ * Get the empty environment code
+ */
+ public static Long getDefaultEnvironmentCode() {
+ return EMPTY_ENVIRONMENT_CODE;
+ }
+
+ /**
+ * Get the environment code or the default environment code if the
environment code is empty
+ */
+ public static Long getEnvironmentCodeOrDefault(Long environmentCode) {
+ return getEnvironmentCodeOrDefault(environmentCode,
getDefaultEnvironmentCode());
+ }
+
+ /**
+ * Get the environment code or the default environment code if the
environment code is empty
+ */
+ public static Long getEnvironmentCodeOrDefault(Long environmentCode, Long
defaultEnvironmentCode) {
+ return isEnvironmentCodeEmpty(environmentCode) ?
defaultEnvironmentCode : environmentCode;
+ }
+
+}
diff --git
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/WorkerGroupUtils.java
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/WorkerGroupUtils.java
new file mode 100644
index 0000000000..2436d45a02
--- /dev/null
+++
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/WorkerGroupUtils.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.dao.utils;
+
+import org.apache.commons.lang3.StringUtils;
+
+public class WorkerGroupUtils {
+
+ private static final String DEFAULT_WORKER_GROUP = "default";
+
+ /**
+ * Check if the worker group is empty, if the worker group is default, it
is considered empty
+ */
+ public static boolean isWorkerGroupEmpty(String workerGroup) {
+ return StringUtils.isEmpty(workerGroup) ||
getDefaultWorkerGroup().equals(workerGroup);
+ }
+
+ public static String getWorkerGroupOrDefault(String workerGroup) {
+ return getWorkerGroupOrDefault(workerGroup, getDefaultWorkerGroup());
+ }
+
+ public static String getWorkerGroupOrDefault(String workerGroup, String
defaultWorkerGroup) {
+ return isWorkerGroupEmpty(workerGroup) ? defaultWorkerGroup :
workerGroup;
+ }
+
+ public static String getDefaultWorkerGroup() {
+ return DEFAULT_WORKER_GROUP;
+ }
+
+}
diff --git
a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/CommandMapperTest.java
b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/CommandMapperTest.java
index 2105885fa3..3f4b0768aa 100644
---
a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/CommandMapperTest.java
+++
b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/CommandMapperTest.java
@@ -19,7 +19,6 @@ package org.apache.dolphinscheduler.dao.mapper;
import static com.google.common.truth.Truth.assertThat;
-import org.apache.dolphinscheduler.common.constants.Constants;
import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.common.enums.FailureStrategy;
import org.apache.dolphinscheduler.common.enums.Flag;
@@ -32,6 +31,7 @@ import org.apache.dolphinscheduler.dao.BaseDaoTest;
import org.apache.dolphinscheduler.dao.entity.Command;
import org.apache.dolphinscheduler.dao.entity.CommandCount;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
+import org.apache.dolphinscheduler.dao.utils.WorkerGroupUtils;
import java.util.Date;
import java.util.HashMap;
@@ -303,7 +303,7 @@ public class CommandMapperTest extends BaseDaoTest {
command.setProcessInstancePriority(Priority.MEDIUM);
command.setStartTime(DateUtils.stringToDate("2019-12-29 10:10:00"));
command.setUpdateTime(DateUtils.stringToDate("2019-12-29 10:10:00"));
- command.setWorkerGroup(Constants.DEFAULT_WORKER_GROUP);
+ command.setWorkerGroup(WorkerGroupUtils.getDefaultWorkerGroup());
command.setProcessInstanceId(0);
command.setProcessDefinitionVersion(0);
commandMapper.insert(command);
diff --git
a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/repository/impl/CommandDaoImplTest.java
b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/repository/impl/CommandDaoImplTest.java
index 85867ef3b5..1bc55f8191 100644
---
a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/repository/impl/CommandDaoImplTest.java
+++
b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/repository/impl/CommandDaoImplTest.java
@@ -19,7 +19,6 @@ package org.apache.dolphinscheduler.dao.repository.impl;
import static com.google.common.truth.Truth.assertThat;
-import org.apache.dolphinscheduler.common.constants.Constants;
import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.common.enums.FailureStrategy;
import org.apache.dolphinscheduler.common.enums.Priority;
@@ -29,6 +28,7 @@ import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.dao.BaseDaoTest;
import org.apache.dolphinscheduler.dao.entity.Command;
import org.apache.dolphinscheduler.dao.repository.CommandDao;
+import org.apache.dolphinscheduler.dao.utils.WorkerGroupUtils;
import org.apache.commons.lang3.RandomUtils;
@@ -80,7 +80,7 @@ class CommandDaoImplTest extends BaseDaoTest {
command.setProcessInstancePriority(Priority.MEDIUM);
command.setStartTime(DateUtils.stringToDate("2019-12-29 10:10:00"));
command.setUpdateTime(DateUtils.stringToDate("2019-12-29 10:10:00"));
- command.setWorkerGroup(Constants.DEFAULT_WORKER_GROUP);
+ command.setWorkerGroup(WorkerGroupUtils.getDefaultWorkerGroup());
command.setProcessInstanceId(0);
command.setProcessDefinitionVersion(0);
commandDao.insert(command);
diff --git
a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/utils/EnvironmentUtilsTest.java
b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/utils/EnvironmentUtilsTest.java
new file mode 100644
index 0000000000..dd5356169d
--- /dev/null
+++
b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/utils/EnvironmentUtilsTest.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.dao.utils;
+
+import static com.google.common.truth.Truth.assertThat;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.CsvSource;
+import org.junit.jupiter.params.provider.ValueSource;
+
+class EnvironmentUtilsTest {
+
+ @ParameterizedTest
+ @ValueSource(longs = {0, -1})
+ void testIsEnvironmentCodeEmpty_emptyEnvironmentCode(Long environmentCode)
{
+
assertThat(EnvironmentUtils.isEnvironmentCodeEmpty(environmentCode)).isTrue();
+ }
+
+ @ParameterizedTest
+ @ValueSource(longs = {123})
+ void testIsEnvironmentCodeEmpty_nonEmptyEnvironmentCode(Long
environmentCode) {
+
assertThat(EnvironmentUtils.isEnvironmentCodeEmpty(environmentCode)).isFalse();
+ }
+
+ @Test
+ void testGetDefaultEnvironmentCode() {
+
assertThat(EnvironmentUtils.getDefaultEnvironmentCode()).isEqualTo(-1L);
+ }
+
+ @ParameterizedTest
+ @ValueSource(longs = {0, -1})
+ void testGetEnvironmentCodeOrDefault_emptyEnvironmentCode(Long
environmentCode) {
+
assertThat(EnvironmentUtils.getEnvironmentCodeOrDefault(environmentCode)).isEqualTo(-1L);
+ }
+
+ @ParameterizedTest
+ @ValueSource(longs = {123})
+ void testGetEnvironmentCodeOrDefault_nonEmptyEnvironmentCode(Long
environmentCode) {
+
assertThat(EnvironmentUtils.getEnvironmentCodeOrDefault(environmentCode)).isEqualTo(environmentCode);
+ }
+
+ @ParameterizedTest
+ @CsvSource(value = {",123", "-1,123"})
+ void
testGetEnvironmentCodeOrDefault_withDefaultValue_emptyEnvironmentCode(Long
environmentCode,
+
Long defaultValue) {
+
assertThat(EnvironmentUtils.getEnvironmentCodeOrDefault(environmentCode,
defaultValue)).isEqualTo(defaultValue);
+ }
+
+ @ParameterizedTest
+ @CsvSource(value = {"1,123"})
+ void
testGetEnvironmentCodeOrDefault_withDefaultValue_nonEmptyEnvironmentCode(Long
environmentCode,
+
Long defaultValue) {
+
assertThat(EnvironmentUtils.getEnvironmentCodeOrDefault(environmentCode,
defaultValue))
+ .isEqualTo(environmentCode);
+ }
+}
diff --git
a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/utils/WorkerGroupUtilsTest.java
b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/utils/WorkerGroupUtilsTest.java
new file mode 100644
index 0000000000..60bf74695d
--- /dev/null
+++
b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/utils/WorkerGroupUtilsTest.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.dao.utils;
+
+import static com.google.common.truth.Truth.assertThat;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.CsvSource;
+import org.junit.jupiter.params.provider.ValueSource;
+
+class WorkerGroupUtilsTest {
+
+ @ParameterizedTest
+ @ValueSource(strings = {"", "default"})
+ void testIsWorkerGroupEmpty_emptyWorkerGroup(String workerGroup) {
+ assertThat(WorkerGroupUtils.isWorkerGroupEmpty(workerGroup)).isTrue();
+ }
+
+ @ParameterizedTest
+ @ValueSource(strings = {"123", "default1"})
+ void testIsWorkerGroupEmpty_nonEmptyWorkerGroup(String workerGroup) {
+ assertThat(WorkerGroupUtils.isWorkerGroupEmpty(workerGroup)).isFalse();
+ }
+
+ @ParameterizedTest
+ @ValueSource(strings = {"", "default"})
+ void testGetWorkerGroupOrDefault_emptyWorkerGroup(String workerGroup) {
+ assertThat(WorkerGroupUtils.getWorkerGroupOrDefault(workerGroup))
+ .isEqualTo(WorkerGroupUtils.getDefaultWorkerGroup());
+ }
+
+ @ParameterizedTest
+ @ValueSource(strings = {"test"})
+ void testGetWorkerGroupOrDefault_nonEmptyWorkerGroup(String workerGroup) {
+
assertThat(WorkerGroupUtils.getWorkerGroupOrDefault(workerGroup)).isEqualTo(workerGroup);
+ }
+
+ @ParameterizedTest
+ @CsvSource(value = {",test", "default,test"})
+ void testGetWorkerGroupOrDefault_withDefaultValue_emptyWorkerGroup(String
workerGroup, String defaultValue) {
+ assertThat(WorkerGroupUtils.getWorkerGroupOrDefault(workerGroup,
defaultValue)).isEqualTo(defaultValue);
+ }
+
+ @ParameterizedTest
+ @CsvSource(value = {"test1,test"})
+ void
testGetWorkerGroupOrDefault_withDefaultValue_nonEmptyWorkerGroup(String
workerGroup, String defaultValue) {
+
assertThat(WorkerGroupUtils.getWorkerGroupOrDefault(workerGroup)).isEqualTo(workerGroup);
+ }
+
+ @Test
+ void getDefaultWorkerGroup() {
+
assertThat(WorkerGroupUtils.getDefaultWorkerGroup()).isEqualTo("default");
+ }
+}
diff --git
a/dolphinscheduler-e2e/dolphinscheduler-e2e-case/src/test/java/org/apache/dolphinscheduler/e2e/cases/WorkflowJavaTaskE2ETest.java
b/dolphinscheduler-e2e/dolphinscheduler-e2e-case/src/test/java/org/apache/dolphinscheduler/e2e/cases/WorkflowJavaTaskE2ETest.java
index d61a9ddd2a..77c4e554bc 100644
---
a/dolphinscheduler-e2e/dolphinscheduler-e2e-case/src/test/java/org/apache/dolphinscheduler/e2e/cases/WorkflowJavaTaskE2ETest.java
+++
b/dolphinscheduler-e2e/dolphinscheduler-e2e-case/src/test/java/org/apache/dolphinscheduler/e2e/cases/WorkflowJavaTaskE2ETest.java
@@ -92,8 +92,8 @@ public class WorkflowJavaTaskE2ETest {
.goToNav(SecurityPage.class)
.goToTab(UserPage.class);
- new WebDriverWait(userPage.driver(),
Duration.ofSeconds(20)).until(ExpectedConditions.visibilityOfElementLocated(
- new By.ByClassName("name")));
+ new WebDriverWait(userPage.driver(), Duration.ofSeconds(20))
+ .until(ExpectedConditions.visibilityOfElementLocated(new
By.ByClassName("name")));
userPage.update(user, user, email, phone, tenant)
.goToNav(ProjectPage.class)
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/context/ExecutionContext.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/context/ExecutionContext.java
deleted file mode 100644
index 8ad4013868..0000000000
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/context/ExecutionContext.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.dolphinscheduler.server.master.dispatch.context;
-
-import static
org.apache.dolphinscheduler.common.constants.Constants.DEFAULT_WORKER_GROUP;
-
-import org.apache.dolphinscheduler.dao.entity.TaskInstance;
-import org.apache.dolphinscheduler.extract.base.utils.Host;
-import org.apache.dolphinscheduler.server.master.dispatch.enums.ExecutorType;
-
-import lombok.AllArgsConstructor;
-import lombok.Builder;
-import lombok.Data;
-import lombok.NoArgsConstructor;
-
-@Data
-@Builder
-@NoArgsConstructor
-@AllArgsConstructor
-public class ExecutionContext {
-
- private Host host;
-
- private TaskInstance taskInstance;
-
- private ExecutorType executorType;
-
- /**
- * worker group
- */
- private String workerGroup;
-
- public ExecutionContext(ExecutorType executorType, TaskInstance
taskInstance) {
- this(executorType, DEFAULT_WORKER_GROUP, taskInstance);
- }
-
- public ExecutionContext(ExecutorType executorType, String workerGroup,
TaskInstance taskInstance) {
- this.executorType = executorType;
- this.workerGroup = workerGroup;
- this.taskInstance = taskInstance;
- }
-}
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/ServerNodeManager.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/ServerNodeManager.java
index f066f0403d..d532168b2a 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/ServerNodeManager.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/ServerNodeManager.java
@@ -25,6 +25,7 @@ import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.dao.AlertDao;
import org.apache.dolphinscheduler.dao.entity.WorkerGroup;
import org.apache.dolphinscheduler.dao.mapper.WorkerGroupMapper;
+import org.apache.dolphinscheduler.dao.utils.WorkerGroupUtils;
import org.apache.dolphinscheduler.registry.api.Event;
import org.apache.dolphinscheduler.registry.api.Event.Type;
import org.apache.dolphinscheduler.registry.api.RegistryClient;
@@ -36,7 +37,6 @@ import
org.apache.dolphinscheduler.service.alert.ListenerEventAlertManager;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.ArrayUtils;
-import org.apache.commons.lang3.StringUtils;
import java.util.ArrayList;
import java.util.Arrays;
@@ -273,8 +273,8 @@ public class ServerNodeManager implements InitializingBean {
.filter(workerNodeInfo::containsKey).collect(Collectors.toSet());
tmpWorkerGroupMappings.put(workerGroupName, activeWorkerNodes);
}
- if
(!tmpWorkerGroupMappings.containsKey(Constants.DEFAULT_WORKER_GROUP)) {
- tmpWorkerGroupMappings.put(Constants.DEFAULT_WORKER_GROUP,
workerNodeInfo.keySet());
+ if
(!tmpWorkerGroupMappings.containsKey(WorkerGroupUtils.getDefaultWorkerGroup()))
{
+
tmpWorkerGroupMappings.put(WorkerGroupUtils.getDefaultWorkerGroup(),
workerNodeInfo.keySet());
}
} finally {
workerNodeInfoReadLock.unlock();
@@ -307,9 +307,7 @@ public class ServerNodeManager implements InitializingBean {
public Set<String> getWorkerGroupNodes(String workerGroup) throws
WorkerGroupNotFoundException {
workerGroupReadLock.lock();
try {
- if (StringUtils.isEmpty(workerGroup)) {
- workerGroup = Constants.DEFAULT_WORKER_GROUP;
- }
+ workerGroup =
WorkerGroupUtils.getWorkerGroupOrDefault(workerGroup);
Set<String> nodes = workerGroupNodes.get(workerGroup);
if (nodes == null) {
throw new
WorkerGroupNotFoundException(String.format("WorkerGroup: %s is invalidated",
workerGroup));
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StreamTaskExecuteRunnable.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StreamTaskExecuteRunnable.java
index 2f61507e72..b09d9f1b1a 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StreamTaskExecuteRunnable.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StreamTaskExecuteRunnable.java
@@ -17,8 +17,6 @@
package org.apache.dolphinscheduler.server.master.runner;
-import static
org.apache.dolphinscheduler.common.constants.Constants.DEFAULT_WORKER_GROUP;
-
import org.apache.dolphinscheduler.common.constants.Constants;
import org.apache.dolphinscheduler.common.enums.Flag;
import org.apache.dolphinscheduler.common.enums.Priority;
@@ -31,6 +29,8 @@ import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationMapper;
import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
+import org.apache.dolphinscheduler.dao.utils.EnvironmentUtils;
+import org.apache.dolphinscheduler.dao.utils.WorkerGroupUtils;
import
org.apache.dolphinscheduler.extract.base.client.SingletonJdkDynamicRpcClientProxyFactory;
import
org.apache.dolphinscheduler.extract.master.transportor.StreamingTaskTriggerRequest;
import
org.apache.dolphinscheduler.extract.worker.ITaskInstanceExecutionEventAckListener;
@@ -270,12 +270,11 @@ public class StreamTaskExecuteRunnable implements
Runnable {
// task dry run flag
taskInstance.setDryRun(taskExecuteStartMessage.getDryRun());
-
taskInstance.setWorkerGroup(StringUtils.isBlank(taskDefinition.getWorkerGroup())
? DEFAULT_WORKER_GROUP
- : taskDefinition.getWorkerGroup());
- taskInstance.setEnvironmentCode(
- taskDefinition.getEnvironmentCode() == 0 ? -1 :
taskDefinition.getEnvironmentCode());
+
taskInstance.setWorkerGroup(WorkerGroupUtils.getWorkerGroupOrDefault(taskDefinition.getWorkerGroup()));
+ taskInstance
+
.setEnvironmentCode(EnvironmentUtils.getEnvironmentCodeOrDefault(taskDefinition.getEnvironmentCode()));
- if (!taskInstance.getEnvironmentCode().equals(-1L)) {
+ if
(!EnvironmentUtils.isEnvironmentCodeEmpty(taskInstance.getEnvironmentCode())) {
Environment environment =
processService.findEnvironmentByCode(taskInstance.getEnvironmentCode());
if (Objects.nonNull(environment) &&
StringUtils.isNotEmpty(environment.getConfig())) {
taskInstance.setEnvironmentConfig(environment.getConfig());
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
index 3177947186..6cccef88e1 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
@@ -25,8 +25,9 @@ import static
org.apache.dolphinscheduler.common.constants.CommandKeyConstants.C
import static
org.apache.dolphinscheduler.common.constants.CommandKeyConstants.CMD_PARAM_START_NODES;
import static
org.apache.dolphinscheduler.common.constants.CommandKeyConstants.CMD_PARAM_START_PARAMS;
import static org.apache.dolphinscheduler.common.constants.Constants.COMMA;
-import static
org.apache.dolphinscheduler.common.constants.Constants.DEFAULT_WORKER_GROUP;
import static
org.apache.dolphinscheduler.common.constants.DateConstants.YYYY_MM_DD_HH_MM_SS;
+import static
org.apache.dolphinscheduler.dao.utils.EnvironmentUtils.getEnvironmentCodeOrDefault;
+import static
org.apache.dolphinscheduler.dao.utils.WorkerGroupUtils.getWorkerGroupOrDefault;
import static
org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYPE_BLOCKING;
import org.apache.dolphinscheduler.common.constants.Constants;
@@ -51,7 +52,9 @@ import org.apache.dolphinscheduler.dao.entity.Schedule;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.repository.ProcessInstanceDao;
import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
+import org.apache.dolphinscheduler.dao.utils.EnvironmentUtils;
import org.apache.dolphinscheduler.dao.utils.TaskCacheUtils;
+import org.apache.dolphinscheduler.dao.utils.WorkerGroupUtils;
import
org.apache.dolphinscheduler.extract.base.client.SingletonJdkDynamicRpcClientProxyFactory;
import org.apache.dolphinscheduler.extract.worker.ITaskInstanceOperator;
import
org.apache.dolphinscheduler.extract.worker.transportor.UpdateWorkflowHostRequest;
@@ -1119,25 +1122,22 @@ public class WorkflowExecuteRunnable implements
IWorkflowExecuteRunnable {
taskInstance.setTaskInstancePriority(taskNode.getTaskInstancePriority());
}
- String processWorkerGroup = processInstance.getWorkerGroup();
- processWorkerGroup = StringUtils.isBlank(processWorkerGroup) ?
DEFAULT_WORKER_GROUP : processWorkerGroup;
- String taskWorkerGroup =
- StringUtils.isBlank(taskNode.getWorkerGroup()) ?
processWorkerGroup : taskNode.getWorkerGroup();
+ String processWorkerGroup =
getWorkerGroupOrDefault(processInstance.getWorkerGroup());
+ String taskWorkerGroup =
getWorkerGroupOrDefault(taskNode.getWorkerGroup());
- Long processEnvironmentCode =
- Objects.isNull(processInstance.getEnvironmentCode()) ? -1 :
processInstance.getEnvironmentCode();
- Long taskEnvironmentCode =
- Objects.isNull(taskNode.getEnvironmentCode()) ?
processEnvironmentCode : taskNode.getEnvironmentCode();
+ Long processEnvironmentCode =
getEnvironmentCodeOrDefault(processInstance.getEnvironmentCode());
+ Long taskEnvironmentCode =
getEnvironmentCodeOrDefault(taskNode.getEnvironmentCode());
- if (!processWorkerGroup.equals(DEFAULT_WORKER_GROUP) &&
taskWorkerGroup.equals(DEFAULT_WORKER_GROUP)) {
+ if (WorkerGroupUtils.isWorkerGroupEmpty(taskWorkerGroup)) {
+ // If the task workerGroup is empty, then use the workflow
workerGroup/environment
taskInstance.setWorkerGroup(processWorkerGroup);
- taskInstance.setEnvironmentCode(processEnvironmentCode);
+
taskInstance.setEnvironmentCode(getEnvironmentCodeOrDefault(taskEnvironmentCode,
processEnvironmentCode));
} else {
taskInstance.setWorkerGroup(taskWorkerGroup);
- taskInstance.setEnvironmentCode(taskEnvironmentCode);
+
taskInstance.setEnvironmentCode(getEnvironmentCodeOrDefault(taskEnvironmentCode,
processEnvironmentCode));
}
- if (!taskInstance.getEnvironmentCode().equals(-1L)) {
+ if
(!EnvironmentUtils.isEnvironmentCodeEmpty(taskInstance.getEnvironmentCode())) {
Environment environment =
processService.findEnvironmentByCode(taskInstance.getEnvironmentCode());
if (Objects.nonNull(environment) &&
StringUtils.isNotEmpty(environment.getConfig())) {
taskInstance.setEnvironmentConfig(environment.getConfig());
diff --git
a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/utils/WorkflowInstanceUtilsTest.java
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/utils/WorkflowInstanceUtilsTest.java
index d52c436add..5b4bc18ca1 100644
---
a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/utils/WorkflowInstanceUtilsTest.java
+++
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/utils/WorkflowInstanceUtilsTest.java
@@ -25,6 +25,7 @@ import
org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
+import org.apache.dolphinscheduler.dao.utils.WorkerGroupUtils;
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
import java.sql.Date;
@@ -52,7 +53,7 @@ public class WorkflowInstanceUtilsTest {
workflowInstance.setDryRun(0);
workflowInstance.setTenantCode("default");
workflowInstance.setRestartTime(Date.valueOf("2023-08-01"));
- workflowInstance.setWorkerGroup("default");
+
workflowInstance.setWorkerGroup(WorkerGroupUtils.getDefaultWorkerGroup());
workflowInstance.setStartTime(Date.valueOf("2023-08-01"));
workflowInstance.setEndTime(Date.valueOf("2023-08-01"));
Assertions.assertEquals("\n"
diff --git
a/dolphinscheduler-scheduler-plugin/dolphinscheduler-scheduler-quartz/src/main/java/org/apache/dolphinscheduler/scheduler/quartz/ProcessScheduleTask.java
b/dolphinscheduler-scheduler-plugin/dolphinscheduler-scheduler-quartz/src/main/java/org/apache/dolphinscheduler/scheduler/quartz/ProcessScheduleTask.java
index c6bd4cf93f..8ce6480c45 100644
---
a/dolphinscheduler-scheduler-plugin/dolphinscheduler-scheduler-quartz/src/main/java/org/apache/dolphinscheduler/scheduler/quartz/ProcessScheduleTask.java
+++
b/dolphinscheduler-scheduler-plugin/dolphinscheduler-scheduler-quartz/src/main/java/org/apache/dolphinscheduler/scheduler/quartz/ProcessScheduleTask.java
@@ -17,17 +17,15 @@
package org.apache.dolphinscheduler.scheduler.quartz;
-import org.apache.dolphinscheduler.common.constants.Constants;
import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.common.enums.ReleaseState;
import org.apache.dolphinscheduler.dao.entity.Command;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.Schedule;
+import org.apache.dolphinscheduler.dao.utils.WorkerGroupUtils;
import org.apache.dolphinscheduler.service.command.CommandService;
import org.apache.dolphinscheduler.service.process.ProcessService;
-import org.apache.commons.lang3.StringUtils;
-
import java.util.Date;
import lombok.extern.slf4j.Slf4j;
@@ -93,8 +91,7 @@ public class ProcessScheduleTask extends QuartzJobBean {
command.setScheduleTime(scheduledFireTime);
command.setStartTime(fireTime);
command.setWarningGroupId(schedule.getWarningGroupId());
- String workerGroup = StringUtils.isEmpty(schedule.getWorkerGroup()) ?
Constants.DEFAULT_WORKER_GROUP
- : schedule.getWorkerGroup();
+ String workerGroup =
WorkerGroupUtils.getWorkerGroupOrDefault(schedule.getWorkerGroup());
command.setWorkerGroup(workerGroup);
command.setTenantCode(schedule.getTenantCode());
command.setEnvironmentCode(schedule.getEnvironmentCode());
diff --git
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
index 635948fc86..470226bec0 100644
---
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
+++
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
@@ -107,6 +107,8 @@ import
org.apache.dolphinscheduler.dao.repository.TaskDefinitionDao;
import org.apache.dolphinscheduler.dao.repository.TaskDefinitionLogDao;
import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
import org.apache.dolphinscheduler.dao.utils.DqRuleUtils;
+import org.apache.dolphinscheduler.dao.utils.EnvironmentUtils;
+import org.apache.dolphinscheduler.dao.utils.WorkerGroupUtils;
import
org.apache.dolphinscheduler.extract.base.client.SingletonJdkDynamicRpcClientProxyFactory;
import org.apache.dolphinscheduler.extract.common.ILogService;
import
org.apache.dolphinscheduler.extract.master.ITaskInstanceExecutionEventListener;
@@ -573,10 +575,8 @@ public class ProcessServiceImpl implements ProcessService {
// set process instance priority
processInstance.setProcessInstancePriority(command.getProcessInstancePriority());
- String workerGroup =
StringUtils.defaultIfEmpty(command.getWorkerGroup(),
Constants.DEFAULT_WORKER_GROUP);
- processInstance.setWorkerGroup(workerGroup);
- processInstance
-
.setEnvironmentCode(Objects.isNull(command.getEnvironmentCode()) ? -1 :
command.getEnvironmentCode());
+
processInstance.setWorkerGroup(WorkerGroupUtils.getWorkerGroupOrDefault(command.getWorkerGroup()));
+
processInstance.setEnvironmentCode(EnvironmentUtils.getEnvironmentCodeOrDefault(command.getEnvironmentCode()));
processInstance.setTimeout(processDefinition.getTimeout());
processInstance.setTenantCode(command.getTenantCode());
return processInstance;