This is an automated email from the ASF dual-hosted git repository.

wenjun pushed a commit to branch 3.2.2-prepare
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/3.2.2-prepare by this push:
     new e65208b34e Fix task cannot use workflow's environment (#16199) (#16206)
e65208b34e is described below

commit e65208b34ec8d5cacfcd473969a1faa6d38cef27
Author: Wenjun Ruan <[email protected]>
AuthorDate: Mon Jun 24 23:00:18 2024 +0800

    Fix task cannot use workflow's environment (#16199) (#16206)
    
    (cherry picked from commit b34fe4604423ff209e42a6a65ac2e75770625143)
---
 .../api/dto/task/TaskCreateRequest.java            |  4 +-
 .../api/service/WorkerGroupService.java            |  8 ---
 .../service/impl/ProcessDefinitionServiceImpl.java |  4 +-
 .../ProjectWorkerGroupRelationServiceImpl.java     |  3 +-
 .../api/service/impl/WorkerGroupServiceImpl.java   | 27 +-------
 .../api/service/ExecuteFunctionServiceTest.java    | 29 +++++----
 .../api/service/ProcessDefinitionServiceTest.java  |  3 +-
 .../api/service/WorkerGroupServiceTest.java        | 44 -------------
 .../common/constants/Constants.java                |  6 +-
 .../dao/utils/EnvironmentUtils.java                | 54 ++++++++++++++++
 .../dao/utils/WorkerGroupUtils.java                | 45 ++++++++++++++
 .../dao/mapper/CommandMapperTest.java              |  4 +-
 .../dao/repository/impl/CommandDaoImplTest.java    |  4 +-
 .../dao/utils/EnvironmentUtilsTest.java            | 72 ++++++++++++++++++++++
 .../dao/utils/WorkerGroupUtilsTest.java            | 70 +++++++++++++++++++++
 .../e2e/cases/WorkflowJavaTaskE2ETest.java         |  4 +-
 .../master/dispatch/context/ExecutionContext.java  | 57 -----------------
 .../server/master/registry/ServerNodeManager.java  | 10 ++-
 .../master/runner/StreamTaskExecuteRunnable.java   | 13 ++--
 .../master/runner/WorkflowExecuteRunnable.java     | 26 ++++----
 .../master/utils/WorkflowInstanceUtilsTest.java    |  3 +-
 .../scheduler/quartz/ProcessScheduleTask.java      |  7 +--
 .../service/process/ProcessServiceImpl.java        |  8 +--
 23 files changed, 306 insertions(+), 199 deletions(-)

diff --git 
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/task/TaskCreateRequest.java
 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/task/TaskCreateRequest.java
index 1e651ee6e3..bb362962fa 100644
--- 
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/task/TaskCreateRequest.java
+++ 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/task/TaskCreateRequest.java
@@ -19,11 +19,11 @@ package org.apache.dolphinscheduler.api.dto.task;
 
 import static 
org.apache.dolphinscheduler.common.constants.Constants.VERSION_FIRST;
 
-import org.apache.dolphinscheduler.common.constants.Constants;
 import org.apache.dolphinscheduler.common.enums.Flag;
 import org.apache.dolphinscheduler.common.enums.Priority;
 import org.apache.dolphinscheduler.common.enums.TimeoutFlag;
 import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
+import org.apache.dolphinscheduler.dao.utils.WorkerGroupUtils;
 import org.apache.dolphinscheduler.plugin.task.api.enums.TaskTimeoutStrategy;
 
 import java.util.Date;
@@ -107,7 +107,7 @@ public class TaskCreateRequest {
         taskDefinition.setProjectCode(this.projectCode);
         taskDefinition.setTaskType(this.taskType);
         taskDefinition.setTaskParams(this.taskParams);
-        taskDefinition.setWorkerGroup(this.workerGroup == null ? 
Constants.DEFAULT_WORKER_GROUP : this.workerGroup);
+        
taskDefinition.setWorkerGroup(WorkerGroupUtils.getWorkerGroupOrDefault(workerGroup));
         taskDefinition.setEnvironmentCode(this.environmentCode);
         taskDefinition.setFailRetryTimes(this.failRetryTimes);
         taskDefinition.setFailRetryInterval(this.failRetryInterval);
diff --git 
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/WorkerGroupService.java
 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/WorkerGroupService.java
index 2c87e4be2e..b85d3912cb 100644
--- 
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/WorkerGroupService.java
+++ 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/WorkerGroupService.java
@@ -18,7 +18,6 @@
 package org.apache.dolphinscheduler.api.service;
 
 import org.apache.dolphinscheduler.api.utils.Result;
-import org.apache.dolphinscheduler.dao.entity.TaskInstance;
 import org.apache.dolphinscheduler.dao.entity.User;
 
 import java.util.List;
@@ -77,13 +76,6 @@ public interface WorkerGroupService {
      */
     Map<String, Object> getWorkerAddressList();
 
-    /**
-     * Get task instance's worker group
-     * @param taskInstance task instance
-     * @return worker group
-     */
-    String getTaskWorkerGroup(TaskInstance taskInstance);
-
     /**
      * Query worker group by process definition codes
      * @param processDefinitionCodeList processDefinitionCodeList
diff --git 
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
index e6893b04fb..6a4251cef1 100644
--- 
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
+++ 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
@@ -34,7 +34,6 @@ import static 
org.apache.dolphinscheduler.api.enums.Status.PROCESS_DEFINE_NOT_EX
 import static 
org.apache.dolphinscheduler.common.constants.CommandKeyConstants.CMD_PARAM_SUB_PROCESS_DEFINE_CODE;
 import static 
org.apache.dolphinscheduler.common.constants.Constants.COPY_SUFFIX;
 import static org.apache.dolphinscheduler.common.constants.Constants.DATA_LIST;
-import static 
org.apache.dolphinscheduler.common.constants.Constants.DEFAULT_WORKER_GROUP;
 import static 
org.apache.dolphinscheduler.common.constants.Constants.GLOBAL_PARAMS;
 import static 
org.apache.dolphinscheduler.common.constants.Constants.IMPORT_SUFFIX;
 import static 
org.apache.dolphinscheduler.common.constants.Constants.LOCAL_PARAMS;
@@ -109,6 +108,7 @@ import 
org.apache.dolphinscheduler.dao.model.PageListingResult;
 import org.apache.dolphinscheduler.dao.repository.ProcessDefinitionDao;
 import org.apache.dolphinscheduler.dao.repository.ProcessDefinitionLogDao;
 import org.apache.dolphinscheduler.dao.repository.TaskDefinitionLogDao;
+import org.apache.dolphinscheduler.dao.utils.WorkerGroupUtils;
 import org.apache.dolphinscheduler.plugin.task.api.TaskPluginManager;
 import org.apache.dolphinscheduler.plugin.task.api.enums.SqlType;
 import org.apache.dolphinscheduler.plugin.task.api.enums.TaskTimeoutStrategy;
@@ -1390,7 +1390,7 @@ public class ProcessDefinitionServiceImpl extends 
BaseServiceImpl implements Pro
         taskDefinition.setFailRetryTimes(0);
         taskDefinition.setFailRetryInterval(0);
         taskDefinition.setTimeoutFlag(TimeoutFlag.CLOSE);
-        taskDefinition.setWorkerGroup(DEFAULT_WORKER_GROUP);
+        
taskDefinition.setWorkerGroup(WorkerGroupUtils.getDefaultWorkerGroup());
         taskDefinition.setTaskPriority(Priority.MEDIUM);
         taskDefinition.setEnvironmentCode(-1);
         taskDefinition.setTimeout(0);
diff --git 
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProjectWorkerGroupRelationServiceImpl.java
 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProjectWorkerGroupRelationServiceImpl.java
index 31dc113dbb..c11915667f 100644
--- 
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProjectWorkerGroupRelationServiceImpl.java
+++ 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProjectWorkerGroupRelationServiceImpl.java
@@ -32,6 +32,7 @@ import 
org.apache.dolphinscheduler.dao.mapper.ProjectWorkerGroupMapper;
 import org.apache.dolphinscheduler.dao.mapper.ScheduleMapper;
 import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper;
 import org.apache.dolphinscheduler.dao.mapper.WorkerGroupMapper;
+import org.apache.dolphinscheduler.dao.utils.WorkerGroupUtils;
 
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.collections4.SetUtils;
@@ -118,7 +119,7 @@ public class ProjectWorkerGroupRelationServiceImpl extends 
BaseServiceImpl
                 
workerGroupMapper.queryAllWorkerGroup().stream().map(WorkerGroup::getName).collect(
                         Collectors.toSet());
 
-        workerGroupNames.add(Constants.DEFAULT_WORKER_GROUP);
+        workerGroupNames.add(WorkerGroupUtils.getDefaultWorkerGroup());
 
         Set<String> assignedWorkerGroupNames = new HashSet<>(workerGroups);
 
diff --git 
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkerGroupServiceImpl.java
 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkerGroupServiceImpl.java
index 9c98880740..4652347987 100644
--- 
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkerGroupServiceImpl.java
+++ 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkerGroupServiceImpl.java
@@ -32,7 +32,6 @@ import 
org.apache.dolphinscheduler.dao.entity.EnvironmentWorkerGroupRelation;
 import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
 import org.apache.dolphinscheduler.dao.entity.Schedule;
 import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
-import org.apache.dolphinscheduler.dao.entity.TaskInstance;
 import org.apache.dolphinscheduler.dao.entity.User;
 import org.apache.dolphinscheduler.dao.entity.WorkerGroup;
 import 
org.apache.dolphinscheduler.dao.mapper.EnvironmentWorkerGroupRelationMapper;
@@ -41,6 +40,7 @@ import 
org.apache.dolphinscheduler.dao.mapper.ProcessInstanceMapper;
 import org.apache.dolphinscheduler.dao.mapper.ScheduleMapper;
 import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper;
 import org.apache.dolphinscheduler.dao.mapper.WorkerGroupMapper;
+import org.apache.dolphinscheduler.dao.utils.WorkerGroupUtils;
 import org.apache.dolphinscheduler.registry.api.RegistryClient;
 import org.apache.dolphinscheduler.registry.api.enums.RegistryNodeType;
 import org.apache.dolphinscheduler.service.process.ProcessService;
@@ -357,12 +357,12 @@ public class WorkerGroupServiceImpl extends 
BaseServiceImpl implements WorkerGro
             workerGroups = workerGroupMapper.queryAllWorkerGroup();
         }
         boolean containDefaultWorkerGroups = workerGroups.stream()
-                .anyMatch(workerGroup -> 
Constants.DEFAULT_WORKER_GROUP.equals(workerGroup.getName()));
+                .anyMatch(workerGroup -> 
WorkerGroupUtils.isWorkerGroupEmpty(workerGroup.getName()));
         if (!containDefaultWorkerGroups) {
             // there doesn't exist a default WorkerGroup, we will add all 
worker to the default worker group.
             Set<String> activeWorkerNodes = 
registryClient.getServerNodeSet(RegistryNodeType.WORKER);
             WorkerGroup defaultWorkerGroup = new WorkerGroup();
-            defaultWorkerGroup.setName(Constants.DEFAULT_WORKER_GROUP);
+            
defaultWorkerGroup.setName(WorkerGroupUtils.getDefaultWorkerGroup());
             defaultWorkerGroup.setAddrList(String.join(Constants.COMMA, 
activeWorkerNodes));
             defaultWorkerGroup.setCreateTime(new Date());
             defaultWorkerGroup.setUpdateTime(new Date());
@@ -431,27 +431,6 @@ public class WorkerGroupServiceImpl extends 
BaseServiceImpl implements WorkerGro
         return result;
     }
 
-    @Override
-    public String getTaskWorkerGroup(TaskInstance taskInstance) {
-        if (taskInstance == null) {
-            return null;
-        }
-
-        String workerGroup = taskInstance.getWorkerGroup();
-
-        if (StringUtils.isNotEmpty(workerGroup)) {
-            return workerGroup;
-        }
-        int processInstanceId = taskInstance.getProcessInstanceId();
-        ProcessInstance processInstance = 
processService.findProcessInstanceById(processInstanceId);
-
-        if (processInstance != null) {
-            return processInstance.getWorkerGroup();
-        }
-        log.info("task : {} will use default worker group", 
taskInstance.getId());
-        return Constants.DEFAULT_WORKER_GROUP;
-    }
-
     @Override
     public Map<Long, String> 
queryWorkerGroupByProcessDefinitionCodes(List<Long> processDefinitionCodeList) {
         List<Schedule> processDefinitionScheduleList =
diff --git 
a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecuteFunctionServiceTest.java
 
b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecuteFunctionServiceTest.java
index 8f1869c1fb..04dabf5ead 100644
--- 
a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecuteFunctionServiceTest.java
+++ 
b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecuteFunctionServiceTest.java
@@ -67,6 +67,7 @@ import 
org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper;
 import org.apache.dolphinscheduler.dao.mapper.TaskGroupQueueMapper;
 import org.apache.dolphinscheduler.dao.mapper.TenantMapper;
 import org.apache.dolphinscheduler.dao.repository.ProcessInstanceDao;
+import org.apache.dolphinscheduler.dao.utils.WorkerGroupUtils;
 import org.apache.dolphinscheduler.registry.api.enums.RegistryNodeType;
 import org.apache.dolphinscheduler.service.command.CommandService;
 import org.apache.dolphinscheduler.service.process.ProcessService;
@@ -271,7 +272,7 @@ public class ExecuteFunctionServiceTest {
                 null, null,
                 null, null, null,
                 RunMode.RUN_MODE_SERIAL,
-                Priority.LOW, Constants.DEFAULT_WORKER_GROUP, tenantCode, 
100L, 10, null, null,
+                Priority.LOW, WorkerGroupUtils.getDefaultWorkerGroup(), 
tenantCode, 100L, 10, null, null,
                 Constants.DRY_RUN_FLAG_NO,
                 Constants.TEST_FLAG_NO,
                 ComplementDependentMode.OFF_MODE, null,
@@ -298,7 +299,7 @@ public class ExecuteFunctionServiceTest {
                 null, "123456789,987654321",
                 null, null, null,
                 RunMode.RUN_MODE_SERIAL,
-                Priority.LOW, Constants.DEFAULT_WORKER_GROUP, tenantCode, 
100L, 110, null, null,
+                Priority.LOW, WorkerGroupUtils.getDefaultWorkerGroup(), 
tenantCode, 100L, 110, null, null,
                 Constants.DRY_RUN_FLAG_NO,
                 Constants.TEST_FLAG_NO,
                 ComplementDependentMode.OFF_MODE, null,
@@ -323,7 +324,7 @@ public class ExecuteFunctionServiceTest {
                     null, "1123456789,987654321",
                     null, null, null,
                     RunMode.RUN_MODE_SERIAL,
-                    Priority.LOW, Constants.DEFAULT_WORKER_GROUP, tenantCode, 
100L, 110, null, 0,
+                    Priority.LOW, WorkerGroupUtils.getDefaultWorkerGroup(), 
tenantCode, 100L, 110, null, 0,
                     Constants.DRY_RUN_FLAG_NO,
                     Constants.TEST_FLAG_NO,
                     ComplementDependentMode.OFF_MODE, null,
@@ -354,14 +355,14 @@ public class ExecuteFunctionServiceTest {
         dependentProcessDefinition.setProcessDefinitionCode(2);
         dependentProcessDefinition.setProcessDefinitionVersion(1);
         dependentProcessDefinition.setTaskDefinitionCode(1);
-        
dependentProcessDefinition.setWorkerGroup(Constants.DEFAULT_WORKER_GROUP);
+        
dependentProcessDefinition.setWorkerGroup(WorkerGroupUtils.getDefaultWorkerGroup());
         dependentProcessDefinition.setTaskParams(
                 
"{\"localParams\":[],\"resourceList\":[],\"dependence\":{\"relation\":\"AND\",\"dependTaskList\":[{\"relation\":\"AND\",\"dependItemList\":[{\"depTaskCode\":2,\"status\":\"SUCCESS\"}]}]},\"conditionResult\":{\"successNode\":[1],\"failedNode\":[1]}}");
         
Mockito.when(processService.queryDependentProcessDefinitionByProcessDefinitionCode(processDefinitionCode))
                 .thenReturn(Lists.newArrayList(dependentProcessDefinition));
 
         Map<Long, String> processDefinitionWorkerGroupMap = new HashMap<>();
-        processDefinitionWorkerGroupMap.put(1L, 
Constants.DEFAULT_WORKER_GROUP);
+        processDefinitionWorkerGroupMap.put(1L, 
WorkerGroupUtils.getDefaultWorkerGroup());
         
Mockito.when(workerGroupService.queryWorkerGroupByProcessDefinitionCodes(Lists.newArrayList(1L)))
                 .thenReturn(processDefinitionWorkerGroupMap);
 
@@ -370,7 +371,7 @@ public class ExecuteFunctionServiceTest {
         command.setCommandType(CommandType.COMPLEMENT_DATA);
         command.setCommandParam(
                 "{\"StartNodeList\":\"1\",\"complementStartDate\":\"2020-01-01 
00:00:00\",\"complementEndDate\":\"2020-01-31 23:00:00\"}");
-        command.setWorkerGroup(Constants.DEFAULT_WORKER_GROUP);
+        command.setWorkerGroup(WorkerGroupUtils.getDefaultWorkerGroup());
         command.setProcessDefinitionCode(processDefinitionCode);
         command.setExecutorId(1);
 
@@ -383,7 +384,7 @@ public class ExecuteFunctionServiceTest {
         childDependent.setProcessDefinitionCode(3);
         childDependent.setProcessDefinitionVersion(1);
         childDependent.setTaskDefinitionCode(4);
-        childDependent.setWorkerGroup(Constants.DEFAULT_WORKER_GROUP);
+        
childDependent.setWorkerGroup(WorkerGroupUtils.getDefaultWorkerGroup());
         childDependent.setTaskParams(
                 
"{\"localParams\":[],\"resourceList\":[],\"dependence\":{\"relation\":\"AND\",\"dependTaskList\":[{\"relation\":\"AND\",\"dependItemList\":[{\"depTaskCode\":3,\"status\":\"SUCCESS\"}]}]},\"conditionResult\":{\"successNode\":[1],\"failedNode\":[1]}}");
         
Mockito.when(processService.queryDependentProcessDefinitionByProcessDefinitionCode(
@@ -409,7 +410,8 @@ public class ExecuteFunctionServiceTest {
                 null, null,
                 null, null, null,
                 RunMode.RUN_MODE_SERIAL,
-                Priority.LOW, Constants.DEFAULT_WORKER_GROUP, tenantCode, 
100L, 110, null, 2, Constants.DRY_RUN_FLAG_NO,
+                Priority.LOW, WorkerGroupUtils.getDefaultWorkerGroup(), 
tenantCode, 100L, 110, null, 2,
+                Constants.DRY_RUN_FLAG_NO,
                 Constants.TEST_FLAG_NO,
                 ComplementDependentMode.OFF_MODE, null,
                 false,
@@ -434,7 +436,7 @@ public class ExecuteFunctionServiceTest {
                 null, null,
                 null, null, null,
                 RunMode.RUN_MODE_SERIAL,
-                Priority.LOW, Constants.DEFAULT_WORKER_GROUP, tenantCode, 
100L, 110, null, null,
+                Priority.LOW, WorkerGroupUtils.getDefaultWorkerGroup(), 
tenantCode, 100L, 110, null, null,
                 Constants.DRY_RUN_FLAG_NO,
                 Constants.TEST_FLAG_NO,
                 ComplementDependentMode.OFF_MODE, null,
@@ -460,7 +462,8 @@ public class ExecuteFunctionServiceTest {
                 null, null,
                 null, null, null,
                 RunMode.RUN_MODE_PARALLEL,
-                Priority.LOW, Constants.DEFAULT_WORKER_GROUP, tenantCode, 
100L, 110, null, 2, Constants.DRY_RUN_FLAG_NO,
+                Priority.LOW, WorkerGroupUtils.getDefaultWorkerGroup(), 
tenantCode, 100L, 110, null, 2,
+                Constants.DRY_RUN_FLAG_NO,
                 Constants.TEST_FLAG_NO,
                 ComplementDependentMode.OFF_MODE, null,
                 false,
@@ -486,7 +489,7 @@ public class ExecuteFunctionServiceTest {
                 null, null,
                 null, null, null,
                 RunMode.RUN_MODE_PARALLEL,
-                Priority.LOW, Constants.DEFAULT_WORKER_GROUP, tenantCode, 
100L, 110, null, 15,
+                Priority.LOW, WorkerGroupUtils.getDefaultWorkerGroup(), 
tenantCode, 100L, 110, null, 15,
                 Constants.DRY_RUN_FLAG_NO,
                 Constants.TEST_FLAG_NO,
                 ComplementDependentMode.OFF_MODE, null,
@@ -514,7 +517,7 @@ public class ExecuteFunctionServiceTest {
                 null,
                 RunMode.RUN_MODE_PARALLEL,
                 Priority.LOW,
-                Constants.DEFAULT_WORKER_GROUP,
+                WorkerGroupUtils.getDefaultWorkerGroup(),
                 tenantCode,
                 100L,
                 110,
@@ -553,7 +556,7 @@ public class ExecuteFunctionServiceTest {
                 null, null,
                 null, null, 0,
                 RunMode.RUN_MODE_PARALLEL,
-                Priority.LOW, Constants.DEFAULT_WORKER_GROUP, tenantCode, 
100L, 110, null, 15,
+                Priority.LOW, WorkerGroupUtils.getDefaultWorkerGroup(), 
tenantCode, 100L, 110, null, 15,
                 Constants.DRY_RUN_FLAG_NO,
                 Constants.TEST_FLAG_YES,
                 ComplementDependentMode.OFF_MODE, null,
diff --git 
a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java
 
b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java
index ed3a5f639b..37af53dbbd 100644
--- 
a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java
+++ 
b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java
@@ -72,6 +72,7 @@ import 
org.apache.dolphinscheduler.dao.model.PageListingResult;
 import org.apache.dolphinscheduler.dao.repository.ProcessDefinitionDao;
 import org.apache.dolphinscheduler.dao.repository.ProcessDefinitionLogDao;
 import org.apache.dolphinscheduler.dao.repository.TaskDefinitionLogDao;
+import org.apache.dolphinscheduler.dao.utils.WorkerGroupUtils;
 import org.apache.dolphinscheduler.service.alert.ListenerEventAlertManager;
 import org.apache.dolphinscheduler.service.process.ProcessService;
 import org.apache.dolphinscheduler.spi.enums.DbType;
@@ -1143,7 +1144,7 @@ public class ProcessDefinitionServiceTest extends 
BaseServiceTestTool {
         schedule.setProcessInstancePriority(Priority.MEDIUM);
         schedule.setWarningType(WarningType.NONE);
         schedule.setWarningGroupId(1);
-        schedule.setWorkerGroup(Constants.DEFAULT_WORKER_GROUP);
+        schedule.setWorkerGroup(WorkerGroupUtils.getDefaultWorkerGroup());
         return schedule;
     }
 
diff --git 
a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/WorkerGroupServiceTest.java
 
b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/WorkerGroupServiceTest.java
index 08a541c5bb..fce9aa3f1c 100644
--- 
a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/WorkerGroupServiceTest.java
+++ 
b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/WorkerGroupServiceTest.java
@@ -30,7 +30,6 @@ import org.apache.dolphinscheduler.common.constants.Constants;
 import org.apache.dolphinscheduler.common.enums.AuthorizationType;
 import org.apache.dolphinscheduler.common.enums.UserType;
 import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
-import org.apache.dolphinscheduler.dao.entity.TaskInstance;
 import org.apache.dolphinscheduler.dao.entity.User;
 import org.apache.dolphinscheduler.dao.entity.WorkerGroup;
 import 
org.apache.dolphinscheduler.dao.mapper.EnvironmentWorkerGroupRelationMapper;
@@ -65,8 +64,6 @@ import org.slf4j.LoggerFactory;
 @MockitoSettings(strictness = Strictness.LENIENT)
 public class WorkerGroupServiceTest {
 
-    private static final Logger logger = 
LoggerFactory.getLogger(WorkerGroupServiceTest.class);
-
     private static final Logger baseServiceLogger = 
LoggerFactory.getLogger(BaseServiceImpl.class);
 
     private static final Logger serviceLogger = 
LoggerFactory.getLogger(WorkerGroupService.class);
@@ -288,47 +285,6 @@ public class WorkerGroupServiceTest {
         Assertions.assertEquals("default", workerGroups.toArray()[0]);
     }
 
-    @Test
-    public void giveNull_whenGetTaskWorkerGroup_expectNull() {
-        String nullWorkerGroup = workerGroupService.getTaskWorkerGroup(null);
-        Assertions.assertNull(nullWorkerGroup);
-    }
-
-    @Test
-    public void 
giveCorrectTaskInstance_whenGetTaskWorkerGroup_expectTaskWorkerGroup() {
-        TaskInstance taskInstance = new TaskInstance();
-        taskInstance.setId(1);
-        taskInstance.setWorkerGroup("cluster1");
-
-        String workerGroup = 
workerGroupService.getTaskWorkerGroup(taskInstance);
-        Assertions.assertEquals("cluster1", workerGroup);
-    }
-
-    @Test
-    public void 
giveNullWorkerGroup_whenGetTaskWorkerGroup_expectProcessWorkerGroup() {
-        TaskInstance taskInstance = new TaskInstance();
-        taskInstance.setId(1);
-        taskInstance.setProcessInstanceId(1);
-        ProcessInstance processInstance = new ProcessInstance();
-        processInstance.setId(1);
-        processInstance.setWorkerGroup("cluster1");
-        
Mockito.when(processService.findProcessInstanceById(1)).thenReturn(processInstance);
-
-        String workerGroup = 
workerGroupService.getTaskWorkerGroup(taskInstance);
-        Assertions.assertEquals("cluster1", workerGroup);
-    }
-
-    @Test
-    public void 
giveNullTaskAndProcessWorkerGroup_whenGetTaskWorkerGroup_expectDefault() {
-        TaskInstance taskInstance = new TaskInstance();
-        taskInstance.setId(1);
-        taskInstance.setProcessInstanceId(1);
-        
Mockito.when(processService.findProcessInstanceById(1)).thenReturn(null);
-
-        String defaultWorkerGroup = 
workerGroupService.getTaskWorkerGroup(taskInstance);
-        Assertions.assertEquals(Constants.DEFAULT_WORKER_GROUP, 
defaultWorkerGroup);
-    }
-
     /**
      * get Group
      */
diff --git 
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/constants/Constants.java
 
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/constants/Constants.java
index 5c56818fa0..c74c7c7bbc 100644
--- 
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/constants/Constants.java
+++ 
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/constants/Constants.java
@@ -47,6 +47,7 @@ public final class Constants {
     public static final String RESOURCE_TYPE_UDF = "udfs";
 
     public static final String EMPTY_STRING = "";
+    public static final int MAX_FILE_SIZE = 1024 * 1024 * 1024;
 
     /**
      * resource.hdfs.fs.defaultFS
@@ -532,14 +533,9 @@ public final class Constants {
      * session timeout
      */
     public static final int SESSION_TIME_OUT = 7200;
-    public static final int MAX_FILE_SIZE = 1024 * 1024 * 1024;
     public static final String UDF = "UDF";
     public static final String CLASS = "class";
 
-    /**
-     * default worker group
-     */
-    public static final String DEFAULT_WORKER_GROUP = "default";
     /**
      * authorize writable perm
      */
diff --git 
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/EnvironmentUtils.java
 
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/EnvironmentUtils.java
new file mode 100644
index 0000000000..89ea647f75
--- /dev/null
+++ 
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/EnvironmentUtils.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.dao.utils;
+
+public class EnvironmentUtils {
+
+    private static final long EMPTY_ENVIRONMENT_CODE = -1L;
+
+    /**
+     * Check if the environment code is empty (we should use null instead of 
-1, this is used to comply with the original code)
+     *
+     * @return true if the environment code is empty, false otherwise
+     */
+    public static boolean isEnvironmentCodeEmpty(Long environmentCode) {
+        return environmentCode == null || environmentCode <= 0;
+    }
+
+    /**
+     * Get the empty environment code
+     */
+    public static Long getDefaultEnvironmentCode() {
+        return EMPTY_ENVIRONMENT_CODE;
+    }
+
+    /**
+     * Get the environment code or the default environment code if the 
environment code is empty
+     */
+    public static Long getEnvironmentCodeOrDefault(Long environmentCode) {
+        return getEnvironmentCodeOrDefault(environmentCode, 
getDefaultEnvironmentCode());
+    }
+
+    /**
+     * Get the environment code or the default environment code if the 
environment code is empty
+     */
+    public static Long getEnvironmentCodeOrDefault(Long environmentCode, Long 
defaultEnvironmentCode) {
+        return isEnvironmentCodeEmpty(environmentCode) ? 
defaultEnvironmentCode : environmentCode;
+    }
+
+}
diff --git 
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/WorkerGroupUtils.java
 
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/WorkerGroupUtils.java
new file mode 100644
index 0000000000..2436d45a02
--- /dev/null
+++ 
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/WorkerGroupUtils.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.dao.utils;
+
+import org.apache.commons.lang3.StringUtils;
+
+public class WorkerGroupUtils {
+
+    private static final String DEFAULT_WORKER_GROUP = "default";
+
+    /**
+     * Check if the worker group is empty, if the worker group is default, it 
is considered empty
+     */
+    public static boolean isWorkerGroupEmpty(String workerGroup) {
+        return StringUtils.isEmpty(workerGroup) || 
getDefaultWorkerGroup().equals(workerGroup);
+    }
+
+    public static String getWorkerGroupOrDefault(String workerGroup) {
+        return getWorkerGroupOrDefault(workerGroup, getDefaultWorkerGroup());
+    }
+
+    public static String getWorkerGroupOrDefault(String workerGroup, String 
defaultWorkerGroup) {
+        return isWorkerGroupEmpty(workerGroup) ? defaultWorkerGroup : 
workerGroup;
+    }
+
+    public static String getDefaultWorkerGroup() {
+        return DEFAULT_WORKER_GROUP;
+    }
+
+}
diff --git 
a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/CommandMapperTest.java
 
b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/CommandMapperTest.java
index 2105885fa3..3f4b0768aa 100644
--- 
a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/CommandMapperTest.java
+++ 
b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/CommandMapperTest.java
@@ -19,7 +19,6 @@ package org.apache.dolphinscheduler.dao.mapper;
 
 import static com.google.common.truth.Truth.assertThat;
 
-import org.apache.dolphinscheduler.common.constants.Constants;
 import org.apache.dolphinscheduler.common.enums.CommandType;
 import org.apache.dolphinscheduler.common.enums.FailureStrategy;
 import org.apache.dolphinscheduler.common.enums.Flag;
@@ -32,6 +31,7 @@ import org.apache.dolphinscheduler.dao.BaseDaoTest;
 import org.apache.dolphinscheduler.dao.entity.Command;
 import org.apache.dolphinscheduler.dao.entity.CommandCount;
 import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
+import org.apache.dolphinscheduler.dao.utils.WorkerGroupUtils;
 
 import java.util.Date;
 import java.util.HashMap;
@@ -303,7 +303,7 @@ public class CommandMapperTest extends BaseDaoTest {
         command.setProcessInstancePriority(Priority.MEDIUM);
         command.setStartTime(DateUtils.stringToDate("2019-12-29 10:10:00"));
         command.setUpdateTime(DateUtils.stringToDate("2019-12-29 10:10:00"));
-        command.setWorkerGroup(Constants.DEFAULT_WORKER_GROUP);
+        command.setWorkerGroup(WorkerGroupUtils.getDefaultWorkerGroup());
         command.setProcessInstanceId(0);
         command.setProcessDefinitionVersion(0);
         commandMapper.insert(command);
diff --git 
a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/repository/impl/CommandDaoImplTest.java
 
b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/repository/impl/CommandDaoImplTest.java
index 85867ef3b5..1bc55f8191 100644
--- 
a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/repository/impl/CommandDaoImplTest.java
+++ 
b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/repository/impl/CommandDaoImplTest.java
@@ -19,7 +19,6 @@ package org.apache.dolphinscheduler.dao.repository.impl;
 
 import static com.google.common.truth.Truth.assertThat;
 
-import org.apache.dolphinscheduler.common.constants.Constants;
 import org.apache.dolphinscheduler.common.enums.CommandType;
 import org.apache.dolphinscheduler.common.enums.FailureStrategy;
 import org.apache.dolphinscheduler.common.enums.Priority;
@@ -29,6 +28,7 @@ import org.apache.dolphinscheduler.common.utils.DateUtils;
 import org.apache.dolphinscheduler.dao.BaseDaoTest;
 import org.apache.dolphinscheduler.dao.entity.Command;
 import org.apache.dolphinscheduler.dao.repository.CommandDao;
+import org.apache.dolphinscheduler.dao.utils.WorkerGroupUtils;
 
 import org.apache.commons.lang3.RandomUtils;
 
@@ -80,7 +80,7 @@ class CommandDaoImplTest extends BaseDaoTest {
         command.setProcessInstancePriority(Priority.MEDIUM);
         command.setStartTime(DateUtils.stringToDate("2019-12-29 10:10:00"));
         command.setUpdateTime(DateUtils.stringToDate("2019-12-29 10:10:00"));
-        command.setWorkerGroup(Constants.DEFAULT_WORKER_GROUP);
+        command.setWorkerGroup(WorkerGroupUtils.getDefaultWorkerGroup());
         command.setProcessInstanceId(0);
         command.setProcessDefinitionVersion(0);
         commandDao.insert(command);
diff --git 
a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/utils/EnvironmentUtilsTest.java
 
b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/utils/EnvironmentUtilsTest.java
new file mode 100644
index 0000000000..dd5356169d
--- /dev/null
+++ 
b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/utils/EnvironmentUtilsTest.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.dao.utils;
+
+import static com.google.common.truth.Truth.assertThat;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.CsvSource;
+import org.junit.jupiter.params.provider.ValueSource;
+
+class EnvironmentUtilsTest {
+
+    @ParameterizedTest
+    @ValueSource(longs = {0, -1})
+    void testIsEnvironmentCodeEmpty_emptyEnvironmentCode(Long environmentCode) 
{
+        
assertThat(EnvironmentUtils.isEnvironmentCodeEmpty(environmentCode)).isTrue();
+    }
+
+    @ParameterizedTest
+    @ValueSource(longs = {123})
+    void testIsEnvironmentCodeEmpty_nonEmptyEnvironmentCode(Long 
environmentCode) {
+        
assertThat(EnvironmentUtils.isEnvironmentCodeEmpty(environmentCode)).isFalse();
+    }
+
+    @Test
+    void testGetDefaultEnvironmentCode() {
+        
assertThat(EnvironmentUtils.getDefaultEnvironmentCode()).isEqualTo(-1L);
+    }
+
+    @ParameterizedTest
+    @ValueSource(longs = {0, -1})
+    void testGetEnvironmentCodeOrDefault_emptyEnvironmentCode(Long 
environmentCode) {
+        
assertThat(EnvironmentUtils.getEnvironmentCodeOrDefault(environmentCode)).isEqualTo(-1L);
+    }
+
+    @ParameterizedTest
+    @ValueSource(longs = {123})
+    void testGetEnvironmentCodeOrDefault_nonEmptyEnvironmentCode(Long 
environmentCode) {
+        
assertThat(EnvironmentUtils.getEnvironmentCodeOrDefault(environmentCode)).isEqualTo(environmentCode);
+    }
+
+    @ParameterizedTest
+    @CsvSource(value = {",123", "-1,123"})
+    void 
testGetEnvironmentCodeOrDefault_withDefaultValue_emptyEnvironmentCode(Long 
environmentCode,
+                                                                               
Long defaultValue) {
+        
assertThat(EnvironmentUtils.getEnvironmentCodeOrDefault(environmentCode, 
defaultValue)).isEqualTo(defaultValue);
+    }
+
+    @ParameterizedTest
+    @CsvSource(value = {"1,123"})
+    void 
testGetEnvironmentCodeOrDefault_withDefaultValue_nonEmptyEnvironmentCode(Long 
environmentCode,
+                                                                               
   Long defaultValue) {
+        
assertThat(EnvironmentUtils.getEnvironmentCodeOrDefault(environmentCode, 
defaultValue))
+                .isEqualTo(environmentCode);
+    }
+}
diff --git 
a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/utils/WorkerGroupUtilsTest.java
 
b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/utils/WorkerGroupUtilsTest.java
new file mode 100644
index 0000000000..60bf74695d
--- /dev/null
+++ 
b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/utils/WorkerGroupUtilsTest.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.dao.utils;
+
+import static com.google.common.truth.Truth.assertThat;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.CsvSource;
+import org.junit.jupiter.params.provider.ValueSource;
+
+class WorkerGroupUtilsTest {
+
+    @ParameterizedTest
+    @ValueSource(strings = {"", "default"})
+    void testIsWorkerGroupEmpty_emptyWorkerGroup(String workerGroup) {
+        assertThat(WorkerGroupUtils.isWorkerGroupEmpty(workerGroup)).isTrue();
+    }
+
+    @ParameterizedTest
+    @ValueSource(strings = {"123", "default1"})
+    void testIsWorkerGroupEmpty_nonEmptyWorkerGroup(String workerGroup) {
+        assertThat(WorkerGroupUtils.isWorkerGroupEmpty(workerGroup)).isFalse();
+    }
+
+    @ParameterizedTest
+    @ValueSource(strings = {"", "default"})
+    void testGetWorkerGroupOrDefault_emptyWorkerGroup(String workerGroup) {
+        assertThat(WorkerGroupUtils.getWorkerGroupOrDefault(workerGroup))
+                .isEqualTo(WorkerGroupUtils.getDefaultWorkerGroup());
+    }
+
+    @ParameterizedTest
+    @ValueSource(strings = {"test"})
+    void testGetWorkerGroupOrDefault_nonEmptyWorkerGroup(String workerGroup) {
+        
assertThat(WorkerGroupUtils.getWorkerGroupOrDefault(workerGroup)).isEqualTo(workerGroup);
+    }
+
+    @ParameterizedTest
+    @CsvSource(value = {",test", "default,test"})
+    void testGetWorkerGroupOrDefault_withDefaultValue_emptyWorkerGroup(String 
workerGroup, String defaultValue) {
+        assertThat(WorkerGroupUtils.getWorkerGroupOrDefault(workerGroup, 
defaultValue)).isEqualTo(defaultValue);
+    }
+
+    @ParameterizedTest
+    @CsvSource(value = {"test1,test"})
+    void 
testGetWorkerGroupOrDefault_withDefaultValue_nonEmptyWorkerGroup(String 
workerGroup, String defaultValue) {
+        
assertThat(WorkerGroupUtils.getWorkerGroupOrDefault(workerGroup)).isEqualTo(workerGroup);
+    }
+
+    @Test
+    void getDefaultWorkerGroup() {
+        
assertThat(WorkerGroupUtils.getDefaultWorkerGroup()).isEqualTo("default");
+    }
+}
diff --git 
a/dolphinscheduler-e2e/dolphinscheduler-e2e-case/src/test/java/org/apache/dolphinscheduler/e2e/cases/WorkflowJavaTaskE2ETest.java
 
b/dolphinscheduler-e2e/dolphinscheduler-e2e-case/src/test/java/org/apache/dolphinscheduler/e2e/cases/WorkflowJavaTaskE2ETest.java
index d61a9ddd2a..77c4e554bc 100644
--- 
a/dolphinscheduler-e2e/dolphinscheduler-e2e-case/src/test/java/org/apache/dolphinscheduler/e2e/cases/WorkflowJavaTaskE2ETest.java
+++ 
b/dolphinscheduler-e2e/dolphinscheduler-e2e-case/src/test/java/org/apache/dolphinscheduler/e2e/cases/WorkflowJavaTaskE2ETest.java
@@ -92,8 +92,8 @@ public class WorkflowJavaTaskE2ETest {
                 .goToNav(SecurityPage.class)
                 .goToTab(UserPage.class);
 
-        new WebDriverWait(userPage.driver(), 
Duration.ofSeconds(20)).until(ExpectedConditions.visibilityOfElementLocated(
-                new By.ByClassName("name")));
+        new WebDriverWait(userPage.driver(), Duration.ofSeconds(20))
+                .until(ExpectedConditions.visibilityOfElementLocated(new 
By.ByClassName("name")));
 
         userPage.update(user, user, email, phone, tenant)
                 .goToNav(ProjectPage.class)
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/context/ExecutionContext.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/context/ExecutionContext.java
deleted file mode 100644
index 8ad4013868..0000000000
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/context/ExecutionContext.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.dolphinscheduler.server.master.dispatch.context;
-
-import static 
org.apache.dolphinscheduler.common.constants.Constants.DEFAULT_WORKER_GROUP;
-
-import org.apache.dolphinscheduler.dao.entity.TaskInstance;
-import org.apache.dolphinscheduler.extract.base.utils.Host;
-import org.apache.dolphinscheduler.server.master.dispatch.enums.ExecutorType;
-
-import lombok.AllArgsConstructor;
-import lombok.Builder;
-import lombok.Data;
-import lombok.NoArgsConstructor;
-
-@Data
-@Builder
-@NoArgsConstructor
-@AllArgsConstructor
-public class ExecutionContext {
-
-    private Host host;
-
-    private TaskInstance taskInstance;
-
-    private ExecutorType executorType;
-
-    /**
-     * worker group
-     */
-    private String workerGroup;
-
-    public ExecutionContext(ExecutorType executorType, TaskInstance 
taskInstance) {
-        this(executorType, DEFAULT_WORKER_GROUP, taskInstance);
-    }
-
-    public ExecutionContext(ExecutorType executorType, String workerGroup, 
TaskInstance taskInstance) {
-        this.executorType = executorType;
-        this.workerGroup = workerGroup;
-        this.taskInstance = taskInstance;
-    }
-}
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/ServerNodeManager.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/ServerNodeManager.java
index f066f0403d..d532168b2a 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/ServerNodeManager.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/ServerNodeManager.java
@@ -25,6 +25,7 @@ import org.apache.dolphinscheduler.common.utils.JSONUtils;
 import org.apache.dolphinscheduler.dao.AlertDao;
 import org.apache.dolphinscheduler.dao.entity.WorkerGroup;
 import org.apache.dolphinscheduler.dao.mapper.WorkerGroupMapper;
+import org.apache.dolphinscheduler.dao.utils.WorkerGroupUtils;
 import org.apache.dolphinscheduler.registry.api.Event;
 import org.apache.dolphinscheduler.registry.api.Event.Type;
 import org.apache.dolphinscheduler.registry.api.RegistryClient;
@@ -36,7 +37,6 @@ import 
org.apache.dolphinscheduler.service.alert.ListenerEventAlertManager;
 
 import org.apache.commons.collections4.CollectionUtils;
 import org.apache.commons.lang3.ArrayUtils;
-import org.apache.commons.lang3.StringUtils;
 
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -273,8 +273,8 @@ public class ServerNodeManager implements InitializingBean {
                         
.filter(workerNodeInfo::containsKey).collect(Collectors.toSet());
                 tmpWorkerGroupMappings.put(workerGroupName, activeWorkerNodes);
             }
-            if 
(!tmpWorkerGroupMappings.containsKey(Constants.DEFAULT_WORKER_GROUP)) {
-                tmpWorkerGroupMappings.put(Constants.DEFAULT_WORKER_GROUP, 
workerNodeInfo.keySet());
+            if 
(!tmpWorkerGroupMappings.containsKey(WorkerGroupUtils.getDefaultWorkerGroup())) 
{
+                
tmpWorkerGroupMappings.put(WorkerGroupUtils.getDefaultWorkerGroup(), 
workerNodeInfo.keySet());
             }
         } finally {
             workerNodeInfoReadLock.unlock();
@@ -307,9 +307,7 @@ public class ServerNodeManager implements InitializingBean {
     public Set<String> getWorkerGroupNodes(String workerGroup) throws 
WorkerGroupNotFoundException {
         workerGroupReadLock.lock();
         try {
-            if (StringUtils.isEmpty(workerGroup)) {
-                workerGroup = Constants.DEFAULT_WORKER_GROUP;
-            }
+            workerGroup = 
WorkerGroupUtils.getWorkerGroupOrDefault(workerGroup);
             Set<String> nodes = workerGroupNodes.get(workerGroup);
             if (nodes == null) {
                 throw new 
WorkerGroupNotFoundException(String.format("WorkerGroup: %s is invalidated", 
workerGroup));
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StreamTaskExecuteRunnable.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StreamTaskExecuteRunnable.java
index 2f61507e72..b09d9f1b1a 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StreamTaskExecuteRunnable.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StreamTaskExecuteRunnable.java
@@ -17,8 +17,6 @@
 
 package org.apache.dolphinscheduler.server.master.runner;
 
-import static 
org.apache.dolphinscheduler.common.constants.Constants.DEFAULT_WORKER_GROUP;
-
 import org.apache.dolphinscheduler.common.constants.Constants;
 import org.apache.dolphinscheduler.common.enums.Flag;
 import org.apache.dolphinscheduler.common.enums.Priority;
@@ -31,6 +29,8 @@ import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
 import org.apache.dolphinscheduler.dao.entity.TaskInstance;
 import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationMapper;
 import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
+import org.apache.dolphinscheduler.dao.utils.EnvironmentUtils;
+import org.apache.dolphinscheduler.dao.utils.WorkerGroupUtils;
 import 
org.apache.dolphinscheduler.extract.base.client.SingletonJdkDynamicRpcClientProxyFactory;
 import 
org.apache.dolphinscheduler.extract.master.transportor.StreamingTaskTriggerRequest;
 import 
org.apache.dolphinscheduler.extract.worker.ITaskInstanceExecutionEventAckListener;
@@ -270,12 +270,11 @@ public class StreamTaskExecuteRunnable implements 
Runnable {
         // task dry run flag
         taskInstance.setDryRun(taskExecuteStartMessage.getDryRun());
 
-        
taskInstance.setWorkerGroup(StringUtils.isBlank(taskDefinition.getWorkerGroup())
 ? DEFAULT_WORKER_GROUP
-                : taskDefinition.getWorkerGroup());
-        taskInstance.setEnvironmentCode(
-                taskDefinition.getEnvironmentCode() == 0 ? -1 : 
taskDefinition.getEnvironmentCode());
+        
taskInstance.setWorkerGroup(WorkerGroupUtils.getWorkerGroupOrDefault(taskDefinition.getWorkerGroup()));
+        taskInstance
+                
.setEnvironmentCode(EnvironmentUtils.getEnvironmentCodeOrDefault(taskDefinition.getEnvironmentCode()));
 
-        if (!taskInstance.getEnvironmentCode().equals(-1L)) {
+        if 
(!EnvironmentUtils.isEnvironmentCodeEmpty(taskInstance.getEnvironmentCode())) {
             Environment environment = 
processService.findEnvironmentByCode(taskInstance.getEnvironmentCode());
             if (Objects.nonNull(environment) && 
StringUtils.isNotEmpty(environment.getConfig())) {
                 taskInstance.setEnvironmentConfig(environment.getConfig());
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
index 3177947186..6cccef88e1 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
@@ -25,8 +25,9 @@ import static 
org.apache.dolphinscheduler.common.constants.CommandKeyConstants.C
 import static 
org.apache.dolphinscheduler.common.constants.CommandKeyConstants.CMD_PARAM_START_NODES;
 import static 
org.apache.dolphinscheduler.common.constants.CommandKeyConstants.CMD_PARAM_START_PARAMS;
 import static org.apache.dolphinscheduler.common.constants.Constants.COMMA;
-import static 
org.apache.dolphinscheduler.common.constants.Constants.DEFAULT_WORKER_GROUP;
 import static 
org.apache.dolphinscheduler.common.constants.DateConstants.YYYY_MM_DD_HH_MM_SS;
+import static 
org.apache.dolphinscheduler.dao.utils.EnvironmentUtils.getEnvironmentCodeOrDefault;
+import static 
org.apache.dolphinscheduler.dao.utils.WorkerGroupUtils.getWorkerGroupOrDefault;
 import static 
org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYPE_BLOCKING;
 
 import org.apache.dolphinscheduler.common.constants.Constants;
@@ -51,7 +52,9 @@ import org.apache.dolphinscheduler.dao.entity.Schedule;
 import org.apache.dolphinscheduler.dao.entity.TaskInstance;
 import org.apache.dolphinscheduler.dao.repository.ProcessInstanceDao;
 import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
+import org.apache.dolphinscheduler.dao.utils.EnvironmentUtils;
 import org.apache.dolphinscheduler.dao.utils.TaskCacheUtils;
+import org.apache.dolphinscheduler.dao.utils.WorkerGroupUtils;
 import 
org.apache.dolphinscheduler.extract.base.client.SingletonJdkDynamicRpcClientProxyFactory;
 import org.apache.dolphinscheduler.extract.worker.ITaskInstanceOperator;
 import 
org.apache.dolphinscheduler.extract.worker.transportor.UpdateWorkflowHostRequest;
@@ -1119,25 +1122,22 @@ public class WorkflowExecuteRunnable implements 
IWorkflowExecuteRunnable {
             
taskInstance.setTaskInstancePriority(taskNode.getTaskInstancePriority());
         }
 
-        String processWorkerGroup = processInstance.getWorkerGroup();
-        processWorkerGroup = StringUtils.isBlank(processWorkerGroup) ? 
DEFAULT_WORKER_GROUP : processWorkerGroup;
-        String taskWorkerGroup =
-                StringUtils.isBlank(taskNode.getWorkerGroup()) ? 
processWorkerGroup : taskNode.getWorkerGroup();
+        String processWorkerGroup = 
getWorkerGroupOrDefault(processInstance.getWorkerGroup());
+        String taskWorkerGroup = 
getWorkerGroupOrDefault(taskNode.getWorkerGroup());
 
-        Long processEnvironmentCode =
-                Objects.isNull(processInstance.getEnvironmentCode()) ? -1 : 
processInstance.getEnvironmentCode();
-        Long taskEnvironmentCode =
-                Objects.isNull(taskNode.getEnvironmentCode()) ? 
processEnvironmentCode : taskNode.getEnvironmentCode();
+        Long processEnvironmentCode = 
getEnvironmentCodeOrDefault(processInstance.getEnvironmentCode());
+        Long taskEnvironmentCode = 
getEnvironmentCodeOrDefault(taskNode.getEnvironmentCode());
 
-        if (!processWorkerGroup.equals(DEFAULT_WORKER_GROUP) && 
taskWorkerGroup.equals(DEFAULT_WORKER_GROUP)) {
+        if (WorkerGroupUtils.isWorkerGroupEmpty(taskWorkerGroup)) {
+            // If the task workerGroup is empty, then use the workflow 
workerGroup/environment
             taskInstance.setWorkerGroup(processWorkerGroup);
-            taskInstance.setEnvironmentCode(processEnvironmentCode);
+            
taskInstance.setEnvironmentCode(getEnvironmentCodeOrDefault(taskEnvironmentCode,
 processEnvironmentCode));
         } else {
             taskInstance.setWorkerGroup(taskWorkerGroup);
-            taskInstance.setEnvironmentCode(taskEnvironmentCode);
+            
taskInstance.setEnvironmentCode(getEnvironmentCodeOrDefault(taskEnvironmentCode,
 processEnvironmentCode));
         }
 
-        if (!taskInstance.getEnvironmentCode().equals(-1L)) {
+        if 
(!EnvironmentUtils.isEnvironmentCodeEmpty(taskInstance.getEnvironmentCode())) {
             Environment environment = 
processService.findEnvironmentByCode(taskInstance.getEnvironmentCode());
             if (Objects.nonNull(environment) && 
StringUtils.isNotEmpty(environment.getConfig())) {
                 taskInstance.setEnvironmentConfig(environment.getConfig());
diff --git 
a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/utils/WorkflowInstanceUtilsTest.java
 
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/utils/WorkflowInstanceUtilsTest.java
index d52c436add..5b4bc18ca1 100644
--- 
a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/utils/WorkflowInstanceUtilsTest.java
+++ 
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/utils/WorkflowInstanceUtilsTest.java
@@ -25,6 +25,7 @@ import 
org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus;
 import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
 import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
 import org.apache.dolphinscheduler.dao.entity.TaskInstance;
+import org.apache.dolphinscheduler.dao.utils.WorkerGroupUtils;
 import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
 
 import java.sql.Date;
@@ -52,7 +53,7 @@ public class WorkflowInstanceUtilsTest {
         workflowInstance.setDryRun(0);
         workflowInstance.setTenantCode("default");
         workflowInstance.setRestartTime(Date.valueOf("2023-08-01"));
-        workflowInstance.setWorkerGroup("default");
+        
workflowInstance.setWorkerGroup(WorkerGroupUtils.getDefaultWorkerGroup());
         workflowInstance.setStartTime(Date.valueOf("2023-08-01"));
         workflowInstance.setEndTime(Date.valueOf("2023-08-01"));
         Assertions.assertEquals("\n"
diff --git 
a/dolphinscheduler-scheduler-plugin/dolphinscheduler-scheduler-quartz/src/main/java/org/apache/dolphinscheduler/scheduler/quartz/ProcessScheduleTask.java
 
b/dolphinscheduler-scheduler-plugin/dolphinscheduler-scheduler-quartz/src/main/java/org/apache/dolphinscheduler/scheduler/quartz/ProcessScheduleTask.java
index c6bd4cf93f..8ce6480c45 100644
--- 
a/dolphinscheduler-scheduler-plugin/dolphinscheduler-scheduler-quartz/src/main/java/org/apache/dolphinscheduler/scheduler/quartz/ProcessScheduleTask.java
+++ 
b/dolphinscheduler-scheduler-plugin/dolphinscheduler-scheduler-quartz/src/main/java/org/apache/dolphinscheduler/scheduler/quartz/ProcessScheduleTask.java
@@ -17,17 +17,15 @@
 
 package org.apache.dolphinscheduler.scheduler.quartz;
 
-import org.apache.dolphinscheduler.common.constants.Constants;
 import org.apache.dolphinscheduler.common.enums.CommandType;
 import org.apache.dolphinscheduler.common.enums.ReleaseState;
 import org.apache.dolphinscheduler.dao.entity.Command;
 import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
 import org.apache.dolphinscheduler.dao.entity.Schedule;
+import org.apache.dolphinscheduler.dao.utils.WorkerGroupUtils;
 import org.apache.dolphinscheduler.service.command.CommandService;
 import org.apache.dolphinscheduler.service.process.ProcessService;
 
-import org.apache.commons.lang3.StringUtils;
-
 import java.util.Date;
 
 import lombok.extern.slf4j.Slf4j;
@@ -93,8 +91,7 @@ public class ProcessScheduleTask extends QuartzJobBean {
         command.setScheduleTime(scheduledFireTime);
         command.setStartTime(fireTime);
         command.setWarningGroupId(schedule.getWarningGroupId());
-        String workerGroup = StringUtils.isEmpty(schedule.getWorkerGroup()) ? 
Constants.DEFAULT_WORKER_GROUP
-                : schedule.getWorkerGroup();
+        String workerGroup = 
WorkerGroupUtils.getWorkerGroupOrDefault(schedule.getWorkerGroup());
         command.setWorkerGroup(workerGroup);
         command.setTenantCode(schedule.getTenantCode());
         command.setEnvironmentCode(schedule.getEnvironmentCode());
diff --git 
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
 
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
index 635948fc86..470226bec0 100644
--- 
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
+++ 
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
@@ -107,6 +107,8 @@ import 
org.apache.dolphinscheduler.dao.repository.TaskDefinitionDao;
 import org.apache.dolphinscheduler.dao.repository.TaskDefinitionLogDao;
 import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
 import org.apache.dolphinscheduler.dao.utils.DqRuleUtils;
+import org.apache.dolphinscheduler.dao.utils.EnvironmentUtils;
+import org.apache.dolphinscheduler.dao.utils.WorkerGroupUtils;
 import 
org.apache.dolphinscheduler.extract.base.client.SingletonJdkDynamicRpcClientProxyFactory;
 import org.apache.dolphinscheduler.extract.common.ILogService;
 import 
org.apache.dolphinscheduler.extract.master.ITaskInstanceExecutionEventListener;
@@ -573,10 +575,8 @@ public class ProcessServiceImpl implements ProcessService {
 
         // set process instance priority
         
processInstance.setProcessInstancePriority(command.getProcessInstancePriority());
-        String workerGroup = 
StringUtils.defaultIfEmpty(command.getWorkerGroup(), 
Constants.DEFAULT_WORKER_GROUP);
-        processInstance.setWorkerGroup(workerGroup);
-        processInstance
-                
.setEnvironmentCode(Objects.isNull(command.getEnvironmentCode()) ? -1 : 
command.getEnvironmentCode());
+        
processInstance.setWorkerGroup(WorkerGroupUtils.getWorkerGroupOrDefault(command.getWorkerGroup()));
+        
processInstance.setEnvironmentCode(EnvironmentUtils.getEnvironmentCodeOrDefault(command.getEnvironmentCode()));
         processInstance.setTimeout(processDefinition.getTimeout());
         processInstance.setTenantCode(command.getTenantCode());
         return processInstance;

Reply via email to