This is an automated email from the ASF dual-hosted git repository.
zihaoxiang pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new 28e849e2fc [Feature-16834] Start worker node with worker group label
(#16875)
28e849e2fc is described below
commit 28e849e2fc1a4263b5ef2c95f76ded3f1df7d262
Author: xiangzihao <[email protected]>
AuthorDate: Fri Dec 6 11:01:39 2024 +0800
[Feature-16834] Start worker node with worker group label (#16875)
---
.../resources/docker/ldap-login/application.yaml | 2 +
.../impl/WorkerGroupAuditOperatorImpl.java | 6 +-
.../controller/ProjectWorkerGroupController.java | 8 +-
.../ResourcePermissionCheckServiceImpl.java | 10 +-
.../service/ProjectWorkerGroupRelationService.java | 5 +-
.../api/service/WorkerGroupService.java | 3 +
.../ProjectWorkerGroupRelationServiceImpl.java | 105 ++++++++-----------
.../api/service/impl/WorkerGroupServiceImpl.java | 115 ++++++++++++---------
.../api/controller/WorkerGroupControllerTest.java | 10 +-
.../WorkerGroupResourcePermissionCheckTest.java | 6 +-
.../api/service/MonitorServiceTest.java | 2 +-
.../ProjectWorkerGroupRelationServiceTest.java | 76 +++++++-------
.../api/service/WorkerGroupServiceTest.java | 29 +++---
.../Server.java => enums/WorkerGroupSource.java} | 33 +++---
.../dolphinscheduler/common/model/Server.java | 7 +-
.../common/model/WorkerHeartBeat.java | 2 +-
.../dao/entity/WorkerGroupPageDetail.java | 22 ++--
.../dao/mapper/ProjectWorkerGroupMapper.java | 13 +++
.../dao/mapper/TaskDefinitionMapper.java | 5 +-
.../dao/mapper/WorkerGroupMapper.java | 9 +-
.../ProjectWorkerGroupDao.java} | 14 ++-
.../dao/repository/TaskDefinitionDao.java | 2 +
.../dao/repository/WorkerGroupDao.java | 10 ++
.../repository/impl/ProjectWorkerGroupDaoImpl.java | 68 ++++++++++++
.../dao/repository/impl/TaskDefinitionDaoImpl.java | 7 +-
.../dao/repository/impl/WorkerGroupDaoImpl.java | 25 +++++
.../dao/utils/WorkerGroupUtils.java | 3 +-
.../dao/mapper/ProjectWorkerGroupMapper.xml | 59 +++++++++++
.../dao/mapper/TaskDefinitionMapper.xml | 6 +-
.../dao/mapper/WorkerGroupMapper.xml | 10 ++
.../main/resources/sql/dolphinscheduler_mysql.sql | 3 +-
.../resources/sql/dolphinscheduler_postgresql.sql | 2 +-
.../3.3.0_schema/mysql/dolphinscheduler_ddl.sql | 4 +-
.../postgresql/dolphinscheduler_ddl.sql | 3 +-
.../dao/mapper/TaskDefinitionMapperTest.java | 6 +-
.../dolphinscheduler/e2e/cases/ProjectE2ETest.java | 24 ++++-
.../e2e/pages/project/ProjectPage.java | 81 ++++++++++++++-
.../server/master/cluster/ClusterManager.java | 4 +-
.../master/cluster/MasterServerMetadata.java | 3 +-
.../server/master/cluster/WorkerClusters.java | 30 +++++-
.../master/cluster/WorkerGroupChangeNotifier.java | 2 +-
.../master/cluster/WorkerServerMetadata.java | 4 +-
.../PhysicalTaskExecutorClientDelegator.java | 5 +-
.../master/cluster/WorkerServerMetadataTest.java | 2 +
.../src/test/resources/application.yaml | 1 +
.../registry/api/RegistryClient.java | 4 +-
dolphinscheduler-standalone-server/pom.xml | 7 ++
.../src/main/resources/application.yaml | 2 +
dolphinscheduler-ui/src/locales/en_US/security.ts | 1 +
dolphinscheduler-ui/src/locales/zh_CN/security.ts | 1 +
.../projects/list/components/use-worker-group.ts | 9 +-
.../list/components/worker-group-modal.tsx | 1 +
.../src/views/projects/list/index.tsx | 1 -
.../src/views/projects/list/use-table.ts | 2 +-
.../components/node/fields/use-worker-group.ts | 2 +-
.../views/projects/task/instance/batch-task.tsx | 1 -
.../views/projects/task/instance/stream-task.tsx | 1 -
.../views/projects/workflow/definition/index.tsx | 1 -
.../src/views/projects/workflow/instance/index.tsx | 1 -
.../security/worker-group-manage/use-table.ts | 5 +
.../server/worker/config/WorkerConfig.java | 1 +
.../server/worker/task/WorkerHeartBeatTask.java | 1 +
.../src/main/resources/application.yaml | 2 +
63 files changed, 612 insertions(+), 277 deletions(-)
diff --git
a/dolphinscheduler-api-test/dolphinscheduler-api-test-case/src/test/resources/docker/ldap-login/application.yaml
b/dolphinscheduler-api-test/dolphinscheduler-api-test-case/src/test/resources/docker/ldap-login/application.yaml
index 51d8e23c2f..4ee2e9ac15 100644
---
a/dolphinscheduler-api-test/dolphinscheduler-api-test-case/src/test/resources/docker/ldap-login/application.yaml
+++
b/dolphinscheduler-api-test/dolphinscheduler-api-test-case/src/test/resources/docker/ldap-login/application.yaml
@@ -190,6 +190,8 @@ worker:
max-heartbeat-interval: 10s
# worker host weight to dispatch tasks, default value 100
host-weight: 100
+ # worker group name
+ group: default
server-load-protection:
enabled: true
# Worker max system cpu usage, when the worker's system cpu usage is
smaller then this value, worker server can be dispatched tasks.
diff --git
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/audit/operator/impl/WorkerGroupAuditOperatorImpl.java
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/audit/operator/impl/WorkerGroupAuditOperatorImpl.java
index f7a2642904..d30bcf396d 100644
---
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/audit/operator/impl/WorkerGroupAuditOperatorImpl.java
+++
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/audit/operator/impl/WorkerGroupAuditOperatorImpl.java
@@ -23,7 +23,7 @@ import
org.apache.dolphinscheduler.api.audit.operator.BaseAuditOperator;
import org.apache.dolphinscheduler.common.enums.AuditOperationType;
import org.apache.dolphinscheduler.dao.entity.AuditLog;
import org.apache.dolphinscheduler.dao.entity.WorkerGroup;
-import org.apache.dolphinscheduler.dao.mapper.WorkerGroupMapper;
+import org.apache.dolphinscheduler.dao.repository.WorkerGroupDao;
import java.util.List;
import java.util.Map;
@@ -35,7 +35,7 @@ import org.springframework.stereotype.Service;
public class WorkerGroupAuditOperatorImpl extends BaseAuditOperator {
@Autowired
- private WorkerGroupMapper workerGroupMapper;
+ private WorkerGroupDao workerGroupDao;
@Override
public void modifyAuditOperationType(AuditType auditType, Map<String,
Object> paramsMap,
@@ -54,7 +54,7 @@ public class WorkerGroupAuditOperatorImpl extends
BaseAuditOperator {
return "";
}
- WorkerGroup obj = workerGroupMapper.selectById(objId);
+ WorkerGroup obj = workerGroupDao.queryById(objId);
return obj == null ? "" : obj.getName();
}
}
diff --git
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProjectWorkerGroupController.java
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProjectWorkerGroupController.java
index 24bd3eddd9..81eba334e0 100644
---
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProjectWorkerGroupController.java
+++
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProjectWorkerGroupController.java
@@ -90,15 +90,15 @@ public class ProjectWorkerGroupController extends
BaseController {
* @param projectCode project code
* @return worker group list
*/
- @Operation(summary = "queryWorkerGroups", description =
"QUERY_WORKER_GROUP_LIST")
+ @Operation(summary = "queryAssignedWorkerGroups", description =
"QUERY_WORKER_GROUP_LIST")
@Parameters({
@Parameter(name = "projectCode", description = "PROJECT_CODE",
schema = @Schema(implementation = long.class, example = "123456"))
})
@GetMapping()
@ResponseStatus(HttpStatus.OK)
- public Map<String, Object> queryWorkerGroups(@Parameter(hidden = true)
@RequestAttribute(value = Constants.SESSION_USER) User loginUser,
- @Parameter(name =
"projectCode", description = "PROJECT_CODE", required = true) @PathVariable
long projectCode) {
- return
projectWorkerGroupRelationService.queryWorkerGroupsByProject(loginUser,
projectCode);
+ public Map<String, Object> queryAssignedWorkerGroups(@Parameter(hidden =
true) @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
+ @Parameter(name =
"projectCode", description = "PROJECT_CODE", required = true) @PathVariable
long projectCode) {
+ return
projectWorkerGroupRelationService.queryAssignedWorkerGroupsByProject(loginUser,
projectCode);
}
}
diff --git
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/permission/ResourcePermissionCheckServiceImpl.java
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/permission/ResourcePermissionCheckServiceImpl.java
index 91a4abfe4c..adfab607c6 100644
---
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/permission/ResourcePermissionCheckServiceImpl.java
+++
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/permission/ResourcePermissionCheckServiceImpl.java
@@ -42,8 +42,8 @@ import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
import org.apache.dolphinscheduler.dao.mapper.QueueMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskGroupMapper;
import org.apache.dolphinscheduler.dao.mapper.TenantMapper;
-import org.apache.dolphinscheduler.dao.mapper.WorkerGroupMapper;
import org.apache.dolphinscheduler.dao.repository.UserDao;
+import org.apache.dolphinscheduler.dao.repository.WorkerGroupDao;
import org.apache.dolphinscheduler.service.process.ProcessService;
import java.util.Arrays;
@@ -273,10 +273,10 @@ public class ResourcePermissionCheckServiceImpl
@Component
public static class WorkerGroupResourcePermissionCheck implements
ResourceAcquisitionAndPermissionCheck<Integer> {
- private final WorkerGroupMapper workerGroupMapper;
+ private final WorkerGroupDao workerGroupDao;
- public WorkerGroupResourcePermissionCheck(WorkerGroupMapper
workerGroupMapper) {
- this.workerGroupMapper = workerGroupMapper;
+ public WorkerGroupResourcePermissionCheck(WorkerGroupDao
workerGroupDao) {
+ this.workerGroupDao = workerGroupDao;
}
@Override
@@ -291,7 +291,7 @@ public class ResourcePermissionCheckServiceImpl
@Override
public Set<Integer> listAuthorizedResourceIds(int userId, Logger
logger) {
- List<WorkerGroup> workerGroups =
workerGroupMapper.queryAllWorkerGroup();
+ List<WorkerGroup> workerGroups =
workerGroupDao.queryAllWorkerGroup();
return
workerGroups.stream().map(WorkerGroup::getId).collect(Collectors.toSet());
}
}
diff --git
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProjectWorkerGroupRelationService.java
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProjectWorkerGroupRelationService.java
index 5b809f8d61..008f8769ff 100644
---
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProjectWorkerGroupRelationService.java
+++
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProjectWorkerGroupRelationService.java
@@ -23,9 +23,6 @@ import org.apache.dolphinscheduler.dao.entity.User;
import java.util.List;
import java.util.Map;
-/**
- * the service of project and worker group
- */
public interface ProjectWorkerGroupRelationService {
/**
@@ -43,6 +40,6 @@ public interface ProjectWorkerGroupRelationService {
* @param loginUser the login user
* @param projectCode project code
*/
- Map<String, Object> queryWorkerGroupsByProject(User loginUser, Long
projectCode);
+ Map<String, Object> queryAssignedWorkerGroupsByProject(User loginUser,
Long projectCode);
}
diff --git
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/WorkerGroupService.java
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/WorkerGroupService.java
index 13cd3c3e35..575d103be2 100644
---
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/WorkerGroupService.java
+++
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/WorkerGroupService.java
@@ -20,6 +20,7 @@ package org.apache.dolphinscheduler.api.service;
import org.apache.dolphinscheduler.api.utils.Result;
import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.entity.WorkerGroup;
+import org.apache.dolphinscheduler.dao.entity.WorkerGroupPageDetail;
import java.util.List;
import java.util.Map;
@@ -79,4 +80,6 @@ public interface WorkerGroupService {
*/
Map<Long, String> queryWorkerGroupByWorkflowDefinitionCodes(List<Long>
workflowDefinitionCodeList);
+ List<WorkerGroupPageDetail> getConfigWorkerGroupPageDetail();
+
}
diff --git
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProjectWorkerGroupRelationServiceImpl.java
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProjectWorkerGroupRelationServiceImpl.java
index c11915667f..3e6e4617f5 100644
---
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProjectWorkerGroupRelationServiceImpl.java
+++
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProjectWorkerGroupRelationServiceImpl.java
@@ -21,23 +21,23 @@ import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.api.exceptions.ServiceException;
import org.apache.dolphinscheduler.api.service.ProjectService;
import
org.apache.dolphinscheduler.api.service.ProjectWorkerGroupRelationService;
+import org.apache.dolphinscheduler.api.service.WorkerGroupService;
import org.apache.dolphinscheduler.api.utils.Result;
import org.apache.dolphinscheduler.common.constants.Constants;
import org.apache.dolphinscheduler.dao.entity.Project;
import org.apache.dolphinscheduler.dao.entity.ProjectWorkerGroup;
import org.apache.dolphinscheduler.dao.entity.User;
-import org.apache.dolphinscheduler.dao.entity.WorkerGroup;
import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
-import org.apache.dolphinscheduler.dao.mapper.ProjectWorkerGroupMapper;
import org.apache.dolphinscheduler.dao.mapper.ScheduleMapper;
-import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper;
-import org.apache.dolphinscheduler.dao.mapper.WorkerGroupMapper;
-import org.apache.dolphinscheduler.dao.utils.WorkerGroupUtils;
+import org.apache.dolphinscheduler.dao.repository.ProjectWorkerGroupDao;
+import org.apache.dolphinscheduler.dao.repository.TaskDefinitionDao;
+import org.apache.dolphinscheduler.dao.repository.WorkerGroupDao;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.collections4.SetUtils;
import org.apache.commons.lang3.StringUtils;
+import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
@@ -53,11 +53,6 @@ import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
-import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
-
-/**
- * task definition service impl
- */
@Service
@Slf4j
public class ProjectWorkerGroupRelationServiceImpl extends BaseServiceImpl
@@ -65,16 +60,13 @@ public class ProjectWorkerGroupRelationServiceImpl extends
BaseServiceImpl
ProjectWorkerGroupRelationService {
@Autowired
- private ProjectWorkerGroupMapper projectWorkerGroupMapper;
+ private ProjectWorkerGroupDao projectWorkerGroupDao;
@Autowired
private ProjectMapper projectMapper;
@Autowired
- private WorkerGroupMapper workerGroupMapper;
-
- @Autowired
- private TaskDefinitionMapper taskDefinitionMapper;
+ private TaskDefinitionDao taskDefinitionDao;
@Autowired
private ScheduleMapper scheduleMapper;
@@ -82,16 +74,14 @@ public class ProjectWorkerGroupRelationServiceImpl extends
BaseServiceImpl
@Autowired
private ProjectService projectService;
- /**
- * assign worker groups to a project
- *
- * @param loginUser the login user
- * @param projectCode the project code
- * @param workerGroups assigned worker group names
- */
+ @Autowired
+ private WorkerGroupDao workerGroupDao;
+
+ @Autowired
+ private WorkerGroupService workerGroupService;
+
@Override
public Result assignWorkerGroupsToProject(User loginUser, Long
projectCode, List<String> workerGroups) {
-
Result result = new Result();
if (!isAdmin(loginUser)) {
@@ -105,7 +95,12 @@ public class ProjectWorkerGroupRelationServiceImpl extends
BaseServiceImpl
}
if (CollectionUtils.isEmpty(workerGroups)) {
- putMsg(result, Status.WORKER_GROUP_TO_PROJECT_IS_EMPTY);
+ boolean deleted =
projectWorkerGroupDao.deleteByProjectCode(projectCode);
+ if (deleted) {
+ putMsg(result, Status.SUCCESS);
+ } else {
+ putMsg(result, Status.ASSIGN_WORKER_GROUP_TO_PROJECT_ERROR);
+ }
return result;
}
@@ -115,30 +110,23 @@ public class ProjectWorkerGroupRelationServiceImpl
extends BaseServiceImpl
return result;
}
- Set<String> workerGroupNames =
-
workerGroupMapper.queryAllWorkerGroup().stream().map(WorkerGroup::getName).collect(
- Collectors.toSet());
-
- workerGroupNames.add(WorkerGroupUtils.getDefaultWorkerGroup());
-
- Set<String> assignedWorkerGroupNames = new HashSet<>(workerGroups);
-
- Set<String> difference = SetUtils.difference(assignedWorkerGroupNames,
workerGroupNames);
+ Set<String> allWorkerGroupNames = new
HashSet<>(workerGroupDao.queryAllWorkerGroupNames());
+ workerGroupService.getConfigWorkerGroupPageDetail().forEach(
+ workerGroupPageDetail ->
allWorkerGroupNames.add(workerGroupPageDetail.getName()));
+ Set<String> unauthorizedWorkerGroupNames = new HashSet<>(workerGroups);
+ // check if assign worker group exists in the system
+ Set<String> difference =
SetUtils.difference(unauthorizedWorkerGroupNames, allWorkerGroupNames);
if (!difference.isEmpty()) {
putMsg(result, Status.WORKER_GROUP_NOT_EXIST,
difference.toString());
return result;
}
- Set<String> projectWorkerGroupNames =
projectWorkerGroupMapper.selectList(new QueryWrapper<ProjectWorkerGroup>()
- .lambda()
- .eq(ProjectWorkerGroup::getProjectCode, projectCode))
- .stream()
- .map(ProjectWorkerGroup::getWorkerGroup)
- .collect(Collectors.toSet());
-
- difference = SetUtils.difference(projectWorkerGroupNames,
assignedWorkerGroupNames);
-
+ // check if assign worker group exists in the project
+ Set<String> projectWorkerGroupNames =
+
projectWorkerGroupDao.queryAssignedWorkerGroupNamesByProjectCode(projectCode);
+ difference = SetUtils.difference(unauthorizedWorkerGroupNames,
projectWorkerGroupNames);
+ Date now = new Date();
if (CollectionUtils.isNotEmpty(difference)) {
Set<String> usedWorkerGroups = getAllUsedWorkerGroups(project);
@@ -147,27 +135,22 @@ public class ProjectWorkerGroupRelationServiceImpl
extends BaseServiceImpl
SetUtils.intersection(usedWorkerGroups,
difference).toSet());
}
- int deleted = projectWorkerGroupMapper.delete(
- new
QueryWrapper<ProjectWorkerGroup>().lambda().eq(ProjectWorkerGroup::getProjectCode,
projectCode)
- .in(ProjectWorkerGroup::getWorkerGroup,
difference));
- if (deleted > 0) {
+ boolean deleted =
+
projectWorkerGroupDao.deleteByProjectCodeAndWorkerGroups(projectCode, new
ArrayList<>(difference));
+ if (deleted) {
log.info("Success to delete worker groups [{}] for the project
[{}] .", difference, project.getName());
} else {
log.error("Failed to delete worker groups [{}] for the project
[{}].", difference, project.getName());
throw new
ServiceException(Status.ASSIGN_WORKER_GROUP_TO_PROJECT_ERROR);
}
- }
- difference = SetUtils.difference(assignedWorkerGroupNames,
projectWorkerGroupNames);
- Date now = new Date();
- if (CollectionUtils.isNotEmpty(difference)) {
- difference.stream().forEach(workerGroupName -> {
+ difference.forEach(workerGroupName -> {
ProjectWorkerGroup projectWorkerGroup = new
ProjectWorkerGroup();
projectWorkerGroup.setProjectCode(projectCode);
projectWorkerGroup.setWorkerGroup(workerGroupName);
projectWorkerGroup.setCreateTime(now);
projectWorkerGroup.setUpdateTime(now);
- int create =
projectWorkerGroupMapper.insert(projectWorkerGroup);
+ int create = projectWorkerGroupDao.insert(projectWorkerGroup);
if (create > 0) {
log.info("Success to add worker group [{}] for the project
[{}] .", workerGroupName,
project.getName());
@@ -183,13 +166,8 @@ public class ProjectWorkerGroupRelationServiceImpl extends
BaseServiceImpl
return result;
}
- /**
- * query worker groups that assigned to the project
- *
- * @param projectCode project code
- */
@Override
- public Map<String, Object> queryWorkerGroupsByProject(User loginUser, Long
projectCode) {
+ public Map<String, Object> queryAssignedWorkerGroupsByProject(User
loginUser, Long projectCode) {
Map<String, Object> result = new HashMap<>();
Project project = projectMapper.queryByCode(projectCode);
@@ -201,9 +179,8 @@ public class ProjectWorkerGroupRelationServiceImpl extends
BaseServiceImpl
Set<String> assignedWorkerGroups = getAllUsedWorkerGroups(project);
- projectWorkerGroupMapper.selectList(
- new
QueryWrapper<ProjectWorkerGroup>().lambda().eq(ProjectWorkerGroup::getProjectCode,
projectCode))
- .stream().forEach(projectWorkerGroup ->
assignedWorkerGroups.add(projectWorkerGroup.getWorkerGroup()));
+ projectWorkerGroupDao.queryByProjectCode(projectCode)
+ .forEach(projectWorkerGroup ->
assignedWorkerGroups.add(projectWorkerGroup.getWorkerGroup()));
List<ProjectWorkerGroup> projectWorkerGroups =
assignedWorkerGroups.stream().map(workerGroup -> {
ProjectWorkerGroup projectWorkerGroup = new ProjectWorkerGroup();
@@ -220,9 +197,9 @@ public class ProjectWorkerGroupRelationServiceImpl extends
BaseServiceImpl
private Set<String> getAllUsedWorkerGroups(Project project) {
Set<String> usedWorkerGroups = new TreeSet<>();
// query all worker groups that tasks depend on
-
taskDefinitionMapper.queryAllDefinitionList(project.getCode()).stream().forEach(taskDefinition
-> {
- if (StringUtils.isNotEmpty(taskDefinition.getWorkerGroup())) {
- usedWorkerGroups.add(taskDefinition.getWorkerGroup());
+
taskDefinitionDao.queryAllTaskDefinitionWorkerGroups(project.getCode()).forEach(workerGroupName
-> {
+ if (StringUtils.isNotEmpty(workerGroupName)) {
+ usedWorkerGroups.add(workerGroupName);
}
});
diff --git
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkerGroupServiceImpl.java
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkerGroupServiceImpl.java
index 4173a0c9e4..52db3bb7b0 100644
---
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkerGroupServiceImpl.java
+++
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkerGroupServiceImpl.java
@@ -28,22 +28,25 @@ import org.apache.dolphinscheduler.api.utils.Result;
import org.apache.dolphinscheduler.common.constants.Constants;
import org.apache.dolphinscheduler.common.enums.AuthorizationType;
import org.apache.dolphinscheduler.common.enums.UserType;
+import org.apache.dolphinscheduler.common.enums.WorkerGroupSource;
import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus;
import org.apache.dolphinscheduler.common.model.Server;
+import org.apache.dolphinscheduler.common.model.WorkerHeartBeat;
+import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.dao.entity.EnvironmentWorkerGroupRelation;
import org.apache.dolphinscheduler.dao.entity.Schedule;
import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.entity.WorkerGroup;
+import org.apache.dolphinscheduler.dao.entity.WorkerGroupPageDetail;
import org.apache.dolphinscheduler.dao.entity.WorkflowInstance;
import
org.apache.dolphinscheduler.dao.mapper.EnvironmentWorkerGroupRelationMapper;
import org.apache.dolphinscheduler.dao.mapper.ScheduleMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper;
-import org.apache.dolphinscheduler.dao.mapper.WorkerGroupMapper;
import org.apache.dolphinscheduler.dao.mapper.WorkflowDefinitionMapper;
import org.apache.dolphinscheduler.dao.mapper.WorkflowInstanceMapper;
-import org.apache.dolphinscheduler.dao.utils.WorkerGroupUtils;
+import org.apache.dolphinscheduler.dao.repository.WorkerGroupDao;
import org.apache.dolphinscheduler.extract.base.client.Clients;
import org.apache.dolphinscheduler.extract.master.IMasterContainerService;
import org.apache.dolphinscheduler.registry.api.RegistryClient;
@@ -76,7 +79,7 @@ import com.google.common.base.Strings;
public class WorkerGroupServiceImpl extends BaseServiceImpl implements
WorkerGroupService {
@Autowired
- private WorkerGroupMapper workerGroupMapper;
+ private WorkerGroupDao workerGroupDao;
@Autowired
private WorkflowInstanceMapper workflowInstanceMapper;
@@ -100,9 +103,9 @@ public class WorkerGroupServiceImpl extends BaseServiceImpl
implements WorkerGro
* create or update a worker group
*
* @param loginUser login user
- * @param id worker group id
- * @param name worker group name
- * @param addrList addr list
+ * @param id worker group id
+ * @param name worker group name
+ * @param addrList addr list
* @return create or update result code
*/
@Override
@@ -131,9 +134,9 @@ public class WorkerGroupServiceImpl extends BaseServiceImpl
implements WorkerGro
workerGroup.setAddrList(addrList);
workerGroup.setUpdateTime(now);
workerGroup.setDescription(description);
- workerGroupMapper.insert(workerGroup);
+ workerGroupDao.insert(workerGroup);
} else {
- workerGroup = workerGroupMapper.selectById(id);
+ workerGroup = workerGroupDao.queryById(id);
if (workerGroup == null) {
throw new ServiceException(Status.WORKER_GROUP_NOT_EXIST,
id);
}
@@ -145,7 +148,7 @@ public class WorkerGroupServiceImpl extends BaseServiceImpl
implements WorkerGro
workerGroup.setAddrList(addrList);
workerGroup.setUpdateTime(now);
workerGroup.setDescription(description);
- workerGroupMapper.updateById(workerGroup);
+ workerGroupDao.updateById(workerGroup);
log.info("Update worker group: {} success .", workerGroup);
}
boardCastToMasterThatWorkerGroupChanged();
@@ -219,9 +222,9 @@ public class WorkerGroupServiceImpl extends BaseServiceImpl
implements WorkerGro
* query worker group paging
*
* @param loginUser login user
- * @param pageNo page number
+ * @param pageNo page number
* @param searchVal search value
- * @param pageSize page size
+ * @param pageSize page size
* @return worker group list page
*/
@Override
@@ -232,28 +235,29 @@ public class WorkerGroupServiceImpl extends
BaseServiceImpl implements WorkerGro
int toIndex = (pageNo - 1) * pageSize + pageSize;
Result result = new Result();
- List<WorkerGroup> workerGroups;
+ List<WorkerGroupPageDetail> workerGroupPageDetails;
if (loginUser.getUserType().equals(UserType.ADMIN_USER)) {
- workerGroups = getWorkerGroups(null);
+ workerGroupPageDetails = getUiWorkerGroupPageDetails(null);
} else {
Set<Integer> ids = resourcePermissionCheckService
.userOwnedResourceIdsAcquisition(AuthorizationType.WORKER_GROUP,
loginUser.getId(), log);
- workerGroups = getWorkerGroups(ids.isEmpty() ?
Collections.emptyList() : new ArrayList<>(ids));
+ workerGroupPageDetails =
+ getUiWorkerGroupPageDetails(ids.isEmpty() ?
Collections.emptyList() : new ArrayList<>(ids));
}
- List<WorkerGroup> resultDataList = new ArrayList<>();
+ List<WorkerGroupPageDetail> resultDataList = new ArrayList<>();
int total = 0;
- if (CollectionUtils.isNotEmpty(workerGroups)) {
- List<WorkerGroup> searchValDataList = new ArrayList<>();
+ if (CollectionUtils.isNotEmpty(workerGroupPageDetails)) {
+ List<WorkerGroupPageDetail> searchValDataList = new ArrayList<>();
if (!StringUtils.isEmpty(searchVal)) {
- for (WorkerGroup workerGroup : workerGroups) {
+ for (WorkerGroupPageDetail workerGroup :
workerGroupPageDetails) {
if (workerGroup.getName().contains(searchVal)) {
searchValDataList.add(workerGroup);
}
}
} else {
- searchValDataList = workerGroups;
+ searchValDataList = workerGroupPageDetails;
}
total = searchValDataList.size();
if (fromIndex < searchValDataList.size()) {
@@ -263,10 +267,12 @@ public class WorkerGroupServiceImpl extends
BaseServiceImpl implements WorkerGro
resultDataList = searchValDataList.subList(fromIndex, toIndex);
}
}
+ List<WorkerGroupPageDetail> configWorkerGroupPageDetails =
getConfigWorkerGroupPageDetail();
+ configWorkerGroupPageDetails.addAll(resultDataList);
- PageInfo<WorkerGroup> pageInfo = new PageInfo<>(pageNo, pageSize);
+ PageInfo<WorkerGroupPageDetail> pageInfo = new PageInfo<>(pageNo,
pageSize);
pageInfo.setTotal(total);
- pageInfo.setTotalList(resultDataList);
+ pageInfo.setTotalList(configWorkerGroupPageDetails);
result.setData(pageInfo);
putMsg(result, Status.SUCCESS);
@@ -282,50 +288,40 @@ public class WorkerGroupServiceImpl extends
BaseServiceImpl implements WorkerGro
@Override
public Map<String, Object> queryAllGroup(User loginUser) {
Map<String, Object> result = new HashMap<>();
- List<WorkerGroup> workerGroups;
+ List<WorkerGroupPageDetail> workerGroups;
if (loginUser.getUserType().equals(UserType.ADMIN_USER)) {
- workerGroups = getWorkerGroups(null);
+ workerGroups = getUiWorkerGroupPageDetails(null);
} else {
Set<Integer> ids = resourcePermissionCheckService
.userOwnedResourceIdsAcquisition(AuthorizationType.WORKER_GROUP,
loginUser.getId(), log);
- workerGroups = getWorkerGroups(ids.isEmpty() ?
Collections.emptyList() : new ArrayList<>(ids));
+ workerGroups = getUiWorkerGroupPageDetails(ids.isEmpty() ?
Collections.emptyList() : new ArrayList<>(ids));
}
+ List<String> configWorkerGroupNames =
getConfigWorkerGroupPageDetail().stream()
+ .map(WorkerGroupPageDetail::getName)
+ .collect(Collectors.toList());
List<String> availableWorkerGroupList = workerGroups.stream()
.map(WorkerGroup::getName)
.collect(Collectors.toList());
+ availableWorkerGroupList.addAll(configWorkerGroupNames);
result.put(Constants.DATA_LIST, availableWorkerGroupList);
putMsg(result, Status.SUCCESS);
return result;
}
- /**
- * get worker groups
- *
- * @return WorkerGroup list
- */
- private List<WorkerGroup> getWorkerGroups(List<Integer> ids) {
- // worker groups from database
+ private List<WorkerGroupPageDetail>
getUiWorkerGroupPageDetails(List<Integer> ids) {
List<WorkerGroup> workerGroups;
if (ids != null) {
- workerGroups = ids.isEmpty() ? new ArrayList<>() :
workerGroupMapper.selectBatchIds(ids);
+ workerGroups = ids.isEmpty() ? new ArrayList<>() :
workerGroupDao.queryByIds(ids);
} else {
- workerGroups = workerGroupMapper.queryAllWorkerGroup();
- }
- boolean containDefaultWorkerGroups = workerGroups.stream()
- .anyMatch(workerGroup ->
WorkerGroupUtils.isWorkerGroupEmpty(workerGroup.getName()));
- if (!containDefaultWorkerGroups) {
- // there doesn't exist a default WorkerGroup, we will add all
worker to the default worker group.
- Set<String> activeWorkerNodes =
registryClient.getServerNodeSet(RegistryNodeType.WORKER);
- WorkerGroup defaultWorkerGroup = new WorkerGroup();
-
defaultWorkerGroup.setName(WorkerGroupUtils.getDefaultWorkerGroup());
- defaultWorkerGroup.setAddrList(String.join(Constants.COMMA,
activeWorkerNodes));
- defaultWorkerGroup.setCreateTime(new Date());
- defaultWorkerGroup.setUpdateTime(new Date());
- defaultWorkerGroup.setSystemDefault(true);
- workerGroups.add(defaultWorkerGroup);
+ workerGroups = workerGroupDao.queryAllWorkerGroup();
}
-
- return workerGroups;
+ return workerGroups.stream()
+ .map(workerGroup -> {
+ WorkerGroupPageDetail workerGroupPageDetail = new
WorkerGroupPageDetail(workerGroup);
+ workerGroupPageDetail.setSource(WorkerGroupSource.UI);
+ workerGroupPageDetail.setSystemDefault(false);
+ return workerGroupPageDetail;
+ }).collect(Collectors.toList());
}
/**
@@ -342,7 +338,7 @@ public class WorkerGroupServiceImpl extends BaseServiceImpl
implements WorkerGro
putMsg(result, Status.USER_NO_OPERATION_PERM);
return result;
}
- WorkerGroup workerGroup = workerGroupMapper.selectById(id);
+ WorkerGroup workerGroup = workerGroupDao.queryById(id);
if (workerGroup == null) {
log.error("Worker group does not exist, workerGroupId:{}.", id);
putMsg(result, Status.DELETE_WORKER_GROUP_NOT_EXIST);
@@ -365,7 +361,7 @@ public class WorkerGroupServiceImpl extends BaseServiceImpl
implements WorkerGro
return result;
}
- workerGroupMapper.deleteById(id);
+ workerGroupDao.deleteById(id);
log.info("Delete worker group complete, workerGroupName:{}.",
workerGroup.getName());
putMsg(result, Status.SUCCESS);
@@ -402,7 +398,7 @@ public class WorkerGroupServiceImpl extends BaseServiceImpl
implements WorkerGro
for (Server master : masters) {
try {
Clients.withService(IMasterContainerService.class)
- .withHost(master.getHost() + ":" + master.getPort())
+ .withHost(master.getHost() + Constants.COLON +
master.getPort())
.refreshWorkerGroup();
} catch (Exception e) {
log.error("Broadcast to master: {} that worker group changed
failed", master, e);
@@ -410,4 +406,21 @@ public class WorkerGroupServiceImpl extends
BaseServiceImpl implements WorkerGro
}
}
+ @Override
+ public List<WorkerGroupPageDetail> getConfigWorkerGroupPageDetail() {
+ List<WorkerGroupPageDetail> workerGroupPageDetails = new ArrayList<>();
+ registryClient.getServerList(RegistryNodeType.WORKER).forEach(server
-> {
+ WorkerGroupPageDetail workerGroupPageDetail = new
WorkerGroupPageDetail();
+ WorkerHeartBeat workerHeartBeat =
JSONUtils.parseObject(server.getHeartBeatInfo(), WorkerHeartBeat.class);
+ workerGroupPageDetail.setName(workerHeartBeat.getWorkerGroup());
+ workerGroupPageDetail.setAddrList(workerHeartBeat.getHost() +
Constants.COLON + workerHeartBeat.getPort());
+ workerGroupPageDetail.setSource(WorkerGroupSource.CONFIG);
+
workerGroupPageDetail.setCreateTime(DateUtils.timeStampToDate(workerHeartBeat.getStartupTime()));
+
workerGroupPageDetail.setUpdateTime(DateUtils.timeStampToDate(workerHeartBeat.getReportTime()));
+ workerGroupPageDetail.setSystemDefault(true);
+ workerGroupPageDetails.add(workerGroupPageDetail);
+ });
+ return workerGroupPageDetails;
+ }
+
}
diff --git
a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/WorkerGroupControllerTest.java
b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/WorkerGroupControllerTest.java
index 6234088ce5..06ac58fde5 100644
---
a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/WorkerGroupControllerTest.java
+++
b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/WorkerGroupControllerTest.java
@@ -27,8 +27,8 @@ import org.apache.dolphinscheduler.api.utils.Result;
import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.dao.entity.WorkerGroup;
-import org.apache.dolphinscheduler.dao.mapper.WorkerGroupMapper;
import org.apache.dolphinscheduler.dao.mapper.WorkflowInstanceMapper;
+import org.apache.dolphinscheduler.dao.repository.WorkerGroupDao;
import org.apache.dolphinscheduler.registry.api.RegistryClient;
import org.apache.dolphinscheduler.registry.api.enums.RegistryNodeType;
@@ -53,8 +53,8 @@ public class WorkerGroupControllerTest extends
AbstractControllerTest {
private static final Logger logger =
LoggerFactory.getLogger(WorkerGroupControllerTest.class);
- @MockBean(name = "workerGroupMapper")
- private WorkerGroupMapper workerGroupMapper;
+ @MockBean(name = "workerGroupDao")
+ private WorkerGroupDao workerGroupDao;
@MockBean(name = "processInstanceMapper")
private WorkflowInstanceMapper workflowInstanceMapper;
@@ -133,11 +133,11 @@ public class WorkerGroupControllerTest extends
AbstractControllerTest {
WorkerGroup workerGroup = new WorkerGroup();
workerGroup.setId(12);
workerGroup.setName("测试");
- Mockito.when(workerGroupMapper.selectById(12)).thenReturn(workerGroup);
+ Mockito.when(workerGroupDao.queryById(12)).thenReturn(workerGroup);
Mockito.when(workflowInstanceMapper.queryByWorkerGroupNameAndStatus("测试",
WorkflowExecutionStatus.getNotTerminalStatus()))
.thenReturn(null);
- Mockito.when(workerGroupMapper.deleteById(12)).thenReturn(1);
+ Mockito.when(workerGroupDao.deleteById(12)).thenReturn(true);
Mockito.when(workflowInstanceMapper.updateWorkflowInstanceByWorkerGroupName("测试",
"")).thenReturn(1);
MvcResult mvcResult = mockMvc.perform(delete("/worker-groups/{id}",
"12")
diff --git
a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/permission/WorkerGroupResourcePermissionCheckTest.java
b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/permission/WorkerGroupResourcePermissionCheckTest.java
index 6f4df9ca72..0f86f22277 100644
---
a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/permission/WorkerGroupResourcePermissionCheckTest.java
+++
b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/permission/WorkerGroupResourcePermissionCheckTest.java
@@ -21,7 +21,7 @@ import
org.apache.dolphinscheduler.common.enums.AuthorizationType;
import org.apache.dolphinscheduler.common.enums.UserType;
import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.entity.WorkerGroup;
-import org.apache.dolphinscheduler.dao.mapper.WorkerGroupMapper;
+import org.apache.dolphinscheduler.dao.repository.WorkerGroupDao;
import java.util.Arrays;
import java.util.Collections;
@@ -47,7 +47,7 @@ public class WorkerGroupResourcePermissionCheckTest {
private
ResourcePermissionCheckServiceImpl.WorkerGroupResourcePermissionCheck
workerGroupResourcePermissionCheck;
@Mock
- private WorkerGroupMapper workerGroupMapper;
+ private WorkerGroupDao workerGroupDao;
@Test
public void testPermissionCheck() {
@@ -69,7 +69,7 @@ public class WorkerGroupResourcePermissionCheckTest {
ids.add(workerGroup.getId());
List<WorkerGroup> workerGroups = Arrays.asList(workerGroup);
-
Mockito.when(workerGroupMapper.queryAllWorkerGroup()).thenReturn(workerGroups);
+
Mockito.when(workerGroupDao.queryAllWorkerGroup()).thenReturn(workerGroups);
Assertions.assertEquals(ids,
workerGroupResourcePermissionCheck.listAuthorizedResourceIds(user.getId(),
logger));
diff --git
a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/MonitorServiceTest.java
b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/MonitorServiceTest.java
index e1318e936d..e9eff1ecee 100644
---
a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/MonitorServiceTest.java
+++
b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/MonitorServiceTest.java
@@ -131,7 +131,7 @@ public class MonitorServiceTest {
Server server = new Server();
server.setId(1);
server.setHost("127.0.0.1");
- server.setZkDirectory("ws/server");
+ server.setServerDirectory("ws/server");
server.setPort(123);
server.setCreateTime(new Date());
server.setLastHeartbeatTime(new Date());
diff --git
a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProjectWorkerGroupRelationServiceTest.java
b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProjectWorkerGroupRelationServiceTest.java
index 6f6b7ca2d6..70afee9641 100644
---
a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProjectWorkerGroupRelationServiceTest.java
+++
b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProjectWorkerGroupRelationServiceTest.java
@@ -30,13 +30,15 @@ import
org.apache.dolphinscheduler.dao.entity.ProjectWorkerGroup;
import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.entity.WorkerGroup;
+import org.apache.dolphinscheduler.dao.entity.WorkerGroupPageDetail;
import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
-import org.apache.dolphinscheduler.dao.mapper.ProjectWorkerGroupMapper;
import org.apache.dolphinscheduler.dao.mapper.ScheduleMapper;
-import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper;
-import org.apache.dolphinscheduler.dao.mapper.WorkerGroupMapper;
+import org.apache.dolphinscheduler.dao.repository.ProjectWorkerGroupDao;
+import org.apache.dolphinscheduler.dao.repository.TaskDefinitionDao;
+import org.apache.dolphinscheduler.dao.repository.WorkerGroupDao;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
@@ -52,6 +54,7 @@ import org.mockito.junit.jupiter.MockitoSettings;
import org.mockito.quality.Strictness;
import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
@ExtendWith(MockitoExtension.class)
@MockitoSettings(strictness = Strictness.LENIENT)
@@ -60,20 +63,23 @@ public class ProjectWorkerGroupRelationServiceTest {
@InjectMocks
private ProjectWorkerGroupRelationServiceImpl
projectWorkerGroupRelationService;
+ @Mock
+ private WorkerGroupService workerGroupService;
+
@Mock
private ProjectMapper projectMapper;
@Mock
- private ProjectWorkerGroupMapper projectWorkerGroupMapper;
+ private ProjectWorkerGroupDao projectWorkerGroupDao;
@Mock
- private WorkerGroupMapper workerGroupMapper;
+ private WorkerGroupDao workerGroupDao;
@Mock
private ProjectService projectService;
@Mock
- private TaskDefinitionMapper taskDefinitionMapper;
+ private TaskDefinitionDao taskDefinitionDao;
@Mock
private ScheduleMapper scheduleMapper;
@@ -95,11 +101,6 @@ public class ProjectWorkerGroupRelationServiceTest {
getWorkerGroups());
Assertions.assertEquals(Status.PROJECT_NOT_EXIST.getCode(),
result.getCode());
- // worker group is empty
- result =
projectWorkerGroupRelationService.assignWorkerGroupsToProject(loginUser,
projectCode,
- Collections.emptyList());
-
Assertions.assertEquals(Status.WORKER_GROUP_TO_PROJECT_IS_EMPTY.getCode(),
result.getCode());
-
// project not exists
Mockito.when(projectMapper.queryByCode(projectCode)).thenReturn(null);
result =
projectWorkerGroupRelationService.assignWorkerGroupsToProject(loginUser,
projectCode,
@@ -109,56 +110,57 @@ public class ProjectWorkerGroupRelationServiceTest {
// worker group not exists
WorkerGroup workerGroup = new WorkerGroup();
workerGroup.setName("test");
+ WorkerGroupPageDetail workerGroupPageDetail = new
WorkerGroupPageDetail();
+ workerGroupPageDetail.setName("test1");
Mockito.when(projectMapper.queryByCode(Mockito.anyLong())).thenReturn(getProject());
-
Mockito.when(workerGroupMapper.queryAllWorkerGroup()).thenReturn(Collections.singletonList(workerGroup));
+
Mockito.when(workerGroupDao.queryAllWorkerGroup()).thenReturn(Collections.singletonList(workerGroup));
+ Mockito.when(workerGroupService.getConfigWorkerGroupPageDetail())
+ .thenReturn(Collections.singletonList(workerGroupPageDetail));
result =
projectWorkerGroupRelationService.assignWorkerGroupsToProject(loginUser,
projectCode,
getDiffWorkerGroups());
Assertions.assertEquals(Status.WORKER_GROUP_NOT_EXIST.getCode(),
result.getCode());
- // db insertion fail
-
Mockito.when(workerGroupMapper.queryAllWorkerGroup()).thenReturn(Collections.singletonList(workerGroup));
-
Mockito.when(projectWorkerGroupMapper.insert(Mockito.any())).thenReturn(-1);
-
AssertionsHelper.assertThrowsServiceException(Status.ASSIGN_WORKER_GROUP_TO_PROJECT_ERROR,
- () ->
projectWorkerGroupRelationService.assignWorkerGroupsToProject(loginUser,
projectCode,
- getWorkerGroups()));
-
// success
-
Mockito.when(projectWorkerGroupMapper.insert(Mockito.any())).thenReturn(1);
-
+
Mockito.when(workerGroupDao.queryAllWorkerGroupNames()).thenReturn(getWorkerGroups());
+
Mockito.when(projectWorkerGroupDao.deleteByProjectCodeAndWorkerGroups(Mockito.any(),
Mockito.any()))
+ .thenReturn(true);
+
Mockito.when(projectWorkerGroupDao.insert(Mockito.any())).thenReturn(1);
result =
projectWorkerGroupRelationService.assignWorkerGroupsToProject(loginUser,
projectCode,
getWorkerGroups());
Assertions.assertEquals(Status.SUCCESS.getCode(), result.getCode());
// success when there is diff between current wg and assigned wg
- Mockito.when(projectWorkerGroupMapper.selectList(Mockito.any()))
-
.thenReturn(Collections.singletonList(getDiffProjectWorkerGroup()));
-
Mockito.when(projectWorkerGroupMapper.delete(Mockito.any())).thenReturn(1);
+
Mockito.when(projectWorkerGroupDao.queryAssignedWorkerGroupNamesByProjectCode(Mockito.any()))
+
.thenReturn(Sets.newHashSet(getDiffProjectWorkerGroup().getWorkerGroup()));
+
Mockito.when(projectWorkerGroupDao.deleteByProjectCodeAndWorkerGroups(getProjectWorkerGroup().getProjectCode(),
+
Collections.singletonList(getDiffProjectWorkerGroup().getWorkerGroup()))).thenReturn(true);
result =
projectWorkerGroupRelationService.assignWorkerGroupsToProject(loginUser,
projectCode,
getWorkerGroups());
Assertions.assertEquals(Status.SUCCESS.getCode(), result.getCode());
// db deletion fail
-
Mockito.when(projectWorkerGroupMapper.delete(Mockito.any())).thenReturn(-1);
+
Mockito.when(projectWorkerGroupDao.deleteByProjectCodeAndWorkerGroups(Mockito.any(),
Mockito.any()))
+ .thenReturn(false);
AssertionsHelper.assertThrowsServiceException(Status.ASSIGN_WORKER_GROUP_TO_PROJECT_ERROR,
() ->
projectWorkerGroupRelationService.assignWorkerGroupsToProject(loginUser,
projectCode,
getWorkerGroups()));
// fail when wg is referenced by task definition
-
Mockito.when(taskDefinitionMapper.queryAllDefinitionList(Mockito.anyLong()))
-
.thenReturn(Collections.singletonList(getTaskDefinitionWithDiffWorkerGroup()));
+
Mockito.when(taskDefinitionDao.queryAllTaskDefinitionWorkerGroups(Mockito.anyLong()))
+
.thenReturn(Collections.singletonList(getProjectWorkerGroup().getWorkerGroup()));
AssertionsHelper.assertThrowsServiceException(Status.USED_WORKER_GROUP_EXISTS,
() ->
projectWorkerGroupRelationService.assignWorkerGroupsToProject(loginUser,
projectCode,
getWorkerGroups()));
}
@Test
- public void testQueryWorkerGroupsByProject() {
+ public void testQueryAssignedWorkerGroupsByProject() {
// no permission
Mockito.when(projectService.hasProjectAndPerm(Mockito.any(),
Mockito.any(), Mockito.anyMap(), Mockito.any()))
.thenReturn(false);
Map<String, Object> result =
-
projectWorkerGroupRelationService.queryWorkerGroupsByProject(getGeneralUser(),
projectCode);
+
projectWorkerGroupRelationService.queryAssignedWorkerGroupsByProject(getGeneralUser(),
projectCode);
Assertions.assertTrue(result.isEmpty());
@@ -169,29 +171,29 @@ public class ProjectWorkerGroupRelationServiceTest {
Mockito.when(projectMapper.queryByCode(projectCode))
.thenReturn(getProject());
- Mockito.when(projectWorkerGroupMapper.selectList(Mockito.any()))
- .thenReturn(Lists.newArrayList(getProjectWorkerGroup()));
+ Mockito.when(projectWorkerGroupDao.queryByProjectCode(Mockito.any()))
+
.thenReturn(Collections.singletonList(getProjectWorkerGroup()));
-
Mockito.when(taskDefinitionMapper.queryAllDefinitionList(Mockito.anyLong()))
+
Mockito.when(taskDefinitionDao.queryAllTaskDefinitionWorkerGroups(Mockito.anyLong()))
.thenReturn(new ArrayList<>());
Mockito.when(scheduleMapper.querySchedulerListByProjectName(Mockito.any()))
.thenReturn(Lists.newArrayList());
- result =
projectWorkerGroupRelationService.queryWorkerGroupsByProject(getGeneralUser(),
projectCode);
+ result =
projectWorkerGroupRelationService.queryAssignedWorkerGroupsByProject(getGeneralUser(),
projectCode);
ProjectWorkerGroup[] actualValue =
((List<ProjectWorkerGroup>)
result.get(Constants.DATA_LIST)).toArray(new ProjectWorkerGroup[0]);
-
+ System.out.println(Arrays.toString(actualValue));
Assertions.assertEquals(actualValue[0].getWorkerGroup(),
getProjectWorkerGroup().getWorkerGroup());
}
private List<String> getWorkerGroups() {
- return Lists.newArrayList("default");
+ return Lists.newArrayList("test");
}
private List<String> getDiffWorkerGroups() {
- return Lists.newArrayList("default", "new");
+ return Lists.newArrayList("test3", "new");
}
private Project getProject() {
@@ -207,7 +209,7 @@ public class ProjectWorkerGroupRelationServiceTest {
ProjectWorkerGroup projectWorkerGroup = new ProjectWorkerGroup();
projectWorkerGroup.setId(1);
projectWorkerGroup.setProjectCode(projectCode);
- projectWorkerGroup.setWorkerGroup("default");
+ projectWorkerGroup.setWorkerGroup("test");
return projectWorkerGroup;
}
diff --git
a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/WorkerGroupServiceTest.java
b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/WorkerGroupServiceTest.java
index 86ea324fd6..01a140635e 100644
---
a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/WorkerGroupServiceTest.java
+++
b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/WorkerGroupServiceTest.java
@@ -39,8 +39,8 @@ import
org.apache.dolphinscheduler.dao.entity.WorkflowInstance;
import
org.apache.dolphinscheduler.dao.mapper.EnvironmentWorkerGroupRelationMapper;
import org.apache.dolphinscheduler.dao.mapper.ScheduleMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper;
-import org.apache.dolphinscheduler.dao.mapper.WorkerGroupMapper;
import org.apache.dolphinscheduler.dao.mapper.WorkflowInstanceMapper;
+import org.apache.dolphinscheduler.dao.repository.WorkerGroupDao;
import org.apache.dolphinscheduler.registry.api.RegistryClient;
import org.apache.dolphinscheduler.registry.api.enums.RegistryNodeType;
@@ -76,7 +76,7 @@ public class WorkerGroupServiceTest {
private WorkerGroupServiceImpl workerGroupService;
@Mock
- private WorkerGroupMapper workerGroupMapper;
+ private WorkerGroupDao workerGroupDao;
@Mock
private WorkflowInstanceMapper workflowInstanceMapper;
@@ -142,7 +142,7 @@ public class WorkerGroupServiceTest {
serverMaps.put("localhost:0000", "");
when(registryClient.getServerMaps(RegistryNodeType.WORKER)).thenReturn(serverMaps);
-
when(workerGroupMapper.insert(Mockito.any())).thenThrow(DuplicateKeyException.class);
+
when(workerGroupDao.insert(Mockito.any())).thenThrow(DuplicateKeyException.class);
assertThrowsServiceException(Status.NAME_EXIST, () -> {
workerGroupService.saveWorkerGroup(loginUser, 0, GROUP_NAME,
"localhost:0000", "test group");
});
@@ -155,8 +155,8 @@ public class WorkerGroupServiceTest {
WORKER_GROUP_CREATE, baseServiceLogger)).thenReturn(true);
when(resourcePermissionCheckService.resourcePermissionCheck(AuthorizationType.WORKER_GROUP,
null, 1,
baseServiceLogger)).thenReturn(true);
- when(workerGroupMapper.selectById(1)).thenReturn(null);
-
when(workerGroupMapper.queryWorkerGroupByName(GROUP_NAME)).thenReturn(null);
+ when(workerGroupDao.queryById(1)).thenReturn(null);
+
when(workerGroupDao.queryWorkerGroupByName(GROUP_NAME)).thenReturn(null);
Map<String, String> serverMaps = new HashMap<>();
serverMaps.put("localhost1:0000", "");
when(registryClient.getServerMaps(RegistryNodeType.WORKER)).thenReturn(serverMaps);
@@ -173,11 +173,11 @@ public class WorkerGroupServiceTest {
when(resourcePermissionCheckService.resourcePermissionCheck(AuthorizationType.WORKER_GROUP,
null, 1,
baseServiceLogger)).thenReturn(true);
-
when(workerGroupMapper.queryWorkerGroupByName(GROUP_NAME)).thenReturn(null);
+
when(workerGroupDao.queryWorkerGroupByName(GROUP_NAME)).thenReturn(null);
Map<String, String> serverMaps = new HashMap<>();
serverMaps.put("localhost:0000", "");
when(registryClient.getServerMaps(RegistryNodeType.WORKER)).thenReturn(serverMaps);
- when(workerGroupMapper.insert(any())).thenReturn(1);
+ when(workerGroupDao.insert(any())).thenReturn(1);
assertDoesNotThrow(() -> {
workerGroupService.saveWorkerGroup(loginUser, 0, GROUP_NAME,
"localhost:0000", "test group");
});
@@ -192,7 +192,7 @@ public class WorkerGroupServiceTest {
workerGroups.add(getWorkerGroup(1));
when(resourcePermissionCheckService.userOwnedResourceIdsAcquisition(AuthorizationType.WORKER_GROUP,
loginUser.getId(), serviceLogger)).thenReturn(ids);
- when(workerGroupMapper.selectBatchIds(ids)).thenReturn(workerGroups);
+ when(workerGroupDao.queryByIds(ids)).thenReturn(workerGroups);
Set<String> activeWorkerNodes = new HashSet<>();
activeWorkerNodes.add("localhost:12345");
activeWorkerNodes.add("localhost:23456");
@@ -206,7 +206,7 @@ public class WorkerGroupServiceTest {
public void testQueryAllGroup() {
Map<String, Object> result =
workerGroupService.queryAllGroup(getLoginUser());
List<String> workerGroups = (List<String>)
result.get(Constants.DATA_LIST);
- Assertions.assertEquals(workerGroups.size(), 1);
+ Assertions.assertEquals(workerGroups.size(), 0);
}
@Test
@@ -216,7 +216,7 @@ public class WorkerGroupServiceTest {
WORKER_GROUP_DELETE, baseServiceLogger)).thenReturn(true);
when(resourcePermissionCheckService.resourcePermissionCheck(AuthorizationType.WORKER_GROUP,
null, 1,
baseServiceLogger)).thenReturn(true);
- when(workerGroupMapper.selectById(1)).thenReturn(null);
+ when(workerGroupDao.queryById(1)).thenReturn(null);
Map<String, Object> notExistResult =
workerGroupService.deleteWorkerGroupById(loginUser, 1);
Assertions.assertEquals(Status.DELETE_WORKER_GROUP_NOT_EXIST.getCode(),
@@ -231,7 +231,7 @@ public class WorkerGroupServiceTest {
when(resourcePermissionCheckService.resourcePermissionCheck(AuthorizationType.WORKER_GROUP,
null, 1,
baseServiceLogger)).thenReturn(true);
WorkerGroup workerGroup = getWorkerGroup(1);
- when(workerGroupMapper.selectById(1)).thenReturn(workerGroup);
+ when(workerGroupDao.queryById(1)).thenReturn(workerGroup);
WorkflowInstance workflowInstance = new WorkflowInstance();
workflowInstance.setId(1);
List<WorkflowInstance> workflowInstances = new
ArrayList<WorkflowInstance>();
@@ -253,11 +253,11 @@ public class WorkerGroupServiceTest {
when(resourcePermissionCheckService.resourcePermissionCheck(AuthorizationType.WORKER_GROUP,
null, 1,
baseServiceLogger)).thenReturn(true);
WorkerGroup workerGroup = getWorkerGroup(1);
- when(workerGroupMapper.selectById(1)).thenReturn(workerGroup);
+ when(workerGroupDao.queryById(1)).thenReturn(workerGroup);
when(workflowInstanceMapper.queryByWorkerGroupNameAndStatus(workerGroup.getName(),
WorkflowExecutionStatus.getNotTerminalStatus())).thenReturn(null);
- when(workerGroupMapper.deleteById(1)).thenReturn(1);
+ when(workerGroupDao.deleteById(1)).thenReturn(true);
when(environmentWorkerGroupRelationMapper.queryByWorkerGroupName(workerGroup.getName()))
.thenReturn(null);
@@ -275,8 +275,7 @@ public class WorkerGroupServiceTest {
public void testQueryAllGroupWithDefault() {
Map<String, Object> result =
workerGroupService.queryAllGroup(getLoginUser());
List<String> workerGroups = (List<String>)
result.get(Constants.DATA_LIST);
- Assertions.assertEquals(1, workerGroups.size());
- Assertions.assertEquals("default", workerGroups.toArray()[0]);
+ Assertions.assertEquals(0, workerGroups.size());
}
/**
diff --git
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/Server.java
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/WorkerGroupSource.java
similarity index 67%
copy from
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/Server.java
copy to
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/WorkerGroupSource.java
index 873a69bcaf..9c7f083b26 100644
---
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/Server.java
+++
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/WorkerGroupSource.java
@@ -15,30 +15,25 @@
* limitations under the License.
*/
-package org.apache.dolphinscheduler.common.model;
+package org.apache.dolphinscheduler.common.enums;
-import java.util.Date;
+import lombok.Getter;
-import lombok.Data;
+import com.baomidou.mybatisplus.annotation.EnumValue;
-@Data
-public class Server {
+@Getter
+public enum WorkerGroupSource {
- private int id;
+ CONFIG(1, "config"),
+ UI(2, "ui");
- private String host;
+ @EnumValue
+ private final int code;
+ private final String desc;
- private int port;
-
- private String zkDirectory;
-
- /**
- * resource info: CPU and memory
- */
- private String resInfo;
-
- private Date createTime;
-
- private Date lastHeartbeatTime;
+ WorkerGroupSource(int code, String desc) {
+ this.code = code;
+ this.desc = desc;
+ }
}
diff --git
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/Server.java
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/Server.java
index 873a69bcaf..4487792703 100644
---
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/Server.java
+++
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/Server.java
@@ -30,12 +30,9 @@ public class Server {
private int port;
- private String zkDirectory;
+ private String serverDirectory;
- /**
- * resource info: CPU and memory
- */
- private String resInfo;
+ private String heartBeatInfo;
private Date createTime;
diff --git
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/WorkerHeartBeat.java
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/WorkerHeartBeat.java
index 8c2727d681..6ccf01e4ab 100644
---
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/WorkerHeartBeat.java
+++
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/WorkerHeartBeat.java
@@ -30,5 +30,5 @@ public class WorkerHeartBeat extends BaseHeartBeat implements
HeartBeat {
private int workerHostWeight; // worker host weight
private double threadPoolUsage; // worker waiting task count
-
+ private String workerGroup;
}
diff --git
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/WorkerHeartBeat.java
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/WorkerGroupPageDetail.java
similarity index 59%
copy from
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/WorkerHeartBeat.java
copy to
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/WorkerGroupPageDetail.java
index 8c2727d681..4a1d719bd5 100644
---
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/WorkerHeartBeat.java
+++
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/WorkerGroupPageDetail.java
@@ -15,20 +15,28 @@
* limitations under the License.
*/
-package org.apache.dolphinscheduler.common.model;
+package org.apache.dolphinscheduler.dao.entity;
+
+import org.apache.dolphinscheduler.common.enums.WorkerGroupSource;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.NoArgsConstructor;
-import lombok.experimental.SuperBuilder;
-@Data
@EqualsAndHashCode(callSuper = true)
-@SuperBuilder
+@Data
@NoArgsConstructor
-public class WorkerHeartBeat extends BaseHeartBeat implements HeartBeat {
+public class WorkerGroupPageDetail extends WorkerGroup {
- private int workerHostWeight; // worker host weight
- private double threadPoolUsage; // worker waiting task count
+ private WorkerGroupSource source;
+ public WorkerGroupPageDetail(WorkerGroup workerGroup) {
+ this.setId(workerGroup.getId());
+ this.setName(workerGroup.getName());
+ this.setAddrList(workerGroup.getAddrList());
+ this.setCreateTime(workerGroup.getCreateTime());
+ this.setUpdateTime(workerGroup.getUpdateTime());
+ this.setDescription(workerGroup.getDescription());
+ this.setSystemDefault(workerGroup.isSystemDefault());
+ }
}
diff --git
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProjectWorkerGroupMapper.java
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProjectWorkerGroupMapper.java
index 40353d4c4a..8eb44fb033 100644
---
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProjectWorkerGroupMapper.java
+++
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProjectWorkerGroupMapper.java
@@ -19,8 +19,21 @@ package org.apache.dolphinscheduler.dao.mapper;
import org.apache.dolphinscheduler.dao.entity.ProjectWorkerGroup;
+import org.apache.ibatis.annotations.Param;
+
+import java.util.List;
+import java.util.Set;
+
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
public interface ProjectWorkerGroupMapper extends
BaseMapper<ProjectWorkerGroup> {
+ int deleteByProjectCode(@Param("projectCode") Long projectCode);
+
+ Set<String>
queryAssignedWorkerGroupNamesByProjectCode(@Param("projectCode") Long
projectCode);
+
+ int deleteByProjectCodeAndWorkerGroups(@Param("projectCode") Long
projectCode,
+ @Param("workerGroups") List<String>
workerGroups);
+
+ List<ProjectWorkerGroup> queryByProjectCode(@Param("projectCode") Long
projectCode);
}
diff --git
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionMapper.java
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionMapper.java
index 4d8abd1501..d9e322192c 100644
---
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionMapper.java
+++
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionMapper.java
@@ -30,9 +30,6 @@ import java.util.List;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.baomidou.mybatisplus.core.metadata.IPage;
-/**
- * task definition mapper interface
- */
public interface TaskDefinitionMapper extends BaseMapper<TaskDefinition> {
/**
@@ -61,7 +58,7 @@ public interface TaskDefinitionMapper extends
BaseMapper<TaskDefinition> {
* @param projectCode projectCode
* @return task definition list
*/
- List<TaskDefinition> queryAllDefinitionList(@Param("projectCode") long
projectCode);
+ List<String> queryAllTaskDefinitionWorkerGroups(@Param("projectCode") long
projectCode);
/**
* count task definition group by user
diff --git
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/WorkerGroupMapper.java
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/WorkerGroupMapper.java
index fc54b4a0dc..06a39223ef 100644
---
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/WorkerGroupMapper.java
+++
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/WorkerGroupMapper.java
@@ -17,6 +17,7 @@
package org.apache.dolphinscheduler.dao.mapper;
+import org.apache.dolphinscheduler.common.enums.WorkerGroupSource;
import org.apache.dolphinscheduler.dao.entity.WorkerGroup;
import org.apache.ibatis.annotations.Param;
@@ -25,9 +26,6 @@ import java.util.List;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
-/**
- * worker group mapper interface
- */
public interface WorkerGroupMapper extends BaseMapper<WorkerGroup> {
/**
@@ -51,4 +49,9 @@ public interface WorkerGroupMapper extends
BaseMapper<WorkerGroup> {
*/
List<WorkerGroup> queryWorkerGroupByName(@Param("name") String name);
+ int updateAddrListByWorkerGroupName(@Param("name") String name,
+ @Param("addrList") String addrList,
+ @Param("source") WorkerGroupSource
source);
+
+ int deleteByWorkerGroupName(@Param("name") String name);
}
diff --git
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProjectWorkerGroupMapper.java
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProjectWorkerGroupDao.java
similarity index 65%
copy from
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProjectWorkerGroupMapper.java
copy to
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProjectWorkerGroupDao.java
index 40353d4c4a..46e2cc7536 100644
---
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProjectWorkerGroupMapper.java
+++
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProjectWorkerGroupDao.java
@@ -15,12 +15,20 @@
* limitations under the License.
*/
-package org.apache.dolphinscheduler.dao.mapper;
+package org.apache.dolphinscheduler.dao.repository;
import org.apache.dolphinscheduler.dao.entity.ProjectWorkerGroup;
-import com.baomidou.mybatisplus.core.mapper.BaseMapper;
+import java.util.List;
+import java.util.Set;
-public interface ProjectWorkerGroupMapper extends
BaseMapper<ProjectWorkerGroup> {
+public interface ProjectWorkerGroupDao extends IDao<ProjectWorkerGroup> {
+ boolean deleteByProjectCode(Long projectCode);
+
+ Set<String> queryAssignedWorkerGroupNamesByProjectCode(Long projectCode);
+
+ boolean deleteByProjectCodeAndWorkerGroups(Long projectCode, List<String>
workerGroups);
+
+ List<ProjectWorkerGroup> queryByProjectCode(Long projectCode);
}
diff --git
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/TaskDefinitionDao.java
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/TaskDefinitionDao.java
index 308a1d4a8d..8eb5856c43 100644
---
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/TaskDefinitionDao.java
+++
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/TaskDefinitionDao.java
@@ -56,4 +56,6 @@ public interface TaskDefinitionDao extends
IDao<TaskDefinition> {
* @return task definition
*/
TaskDefinition queryByCode(long taskCode);
+
+ List<String> queryAllTaskDefinitionWorkerGroups(long projectCode);
}
diff --git
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/WorkerGroupDao.java
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/WorkerGroupDao.java
index 8ffcff30da..7db0c7d10f 100644
---
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/WorkerGroupDao.java
+++
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/WorkerGroupDao.java
@@ -19,5 +19,15 @@ package org.apache.dolphinscheduler.dao.repository;
import org.apache.dolphinscheduler.dao.entity.WorkerGroup;
+import java.util.List;
+
public interface WorkerGroupDao extends IDao<WorkerGroup> {
+
+ boolean deleteByWorkerGroupName(String workerGroupName);
+
+ List<String> queryAllWorkerGroupNames();
+
+ List<WorkerGroup> queryAllWorkerGroup();
+
+ List<WorkerGroup> queryWorkerGroupByName(String name);
}
diff --git
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/ProjectWorkerGroupDaoImpl.java
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/ProjectWorkerGroupDaoImpl.java
new file mode 100644
index 0000000000..bbc6263bac
--- /dev/null
+++
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/ProjectWorkerGroupDaoImpl.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.dao.repository.impl;
+
+import org.apache.dolphinscheduler.dao.entity.ProjectWorkerGroup;
+import org.apache.dolphinscheduler.dao.mapper.ProjectWorkerGroupMapper;
+import org.apache.dolphinscheduler.dao.repository.BaseDao;
+import org.apache.dolphinscheduler.dao.repository.ProjectWorkerGroupDao;
+
+import java.util.List;
+import java.util.Set;
+
+import lombok.NonNull;
+
+import org.springframework.stereotype.Repository;
+
+@Repository
+public class ProjectWorkerGroupDaoImpl extends BaseDao<ProjectWorkerGroup,
ProjectWorkerGroupMapper>
+ implements
+ ProjectWorkerGroupDao {
+
+ public ProjectWorkerGroupDaoImpl(@NonNull ProjectWorkerGroupMapper
projectWorkerGroupMapper) {
+ super(projectWorkerGroupMapper);
+ }
+
+ @Override
+ public boolean deleteByProjectCode(Long projectCode) {
+ int deleted = mybatisMapper.deleteByProjectCode(projectCode);
+ return deleted > 0;
+ }
+
+ @Override
+ public Set<String> queryAssignedWorkerGroupNamesByProjectCode(Long
projectCode) {
+ if (projectCode == null) {
+ return null;
+ }
+ return
mybatisMapper.queryAssignedWorkerGroupNamesByProjectCode(projectCode);
+ }
+
+ @Override
+ public boolean deleteByProjectCodeAndWorkerGroups(Long projectCode,
List<String> workerGroups) {
+ if
(mybatisMapper.queryAssignedWorkerGroupNamesByProjectCode(projectCode).isEmpty())
{
+ return true;
+ }
+ int deleted =
mybatisMapper.deleteByProjectCodeAndWorkerGroups(projectCode, workerGroups);
+ return deleted > 0;
+ }
+
+ @Override
+ public List<ProjectWorkerGroup> queryByProjectCode(Long projectCode) {
+ return mybatisMapper.queryByProjectCode(projectCode);
+ }
+}
diff --git
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/TaskDefinitionDaoImpl.java
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/TaskDefinitionDaoImpl.java
index 7037e2370a..ca3c9b2e6f 100644
---
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/TaskDefinitionDaoImpl.java
+++
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/TaskDefinitionDaoImpl.java
@@ -45,9 +45,6 @@ import org.springframework.stereotype.Repository;
import com.google.common.collect.Lists;
-/**
- * Task Definition DAO Implementation
- */
@Repository
@Slf4j
public class TaskDefinitionDaoImpl extends BaseDao<TaskDefinition,
TaskDefinitionMapper> implements TaskDefinitionDao {
@@ -120,4 +117,8 @@ public class TaskDefinitionDaoImpl extends
BaseDao<TaskDefinition, TaskDefinitio
return mybatisMapper.queryByCode(taskCode);
}
+ @Override
+ public List<String> queryAllTaskDefinitionWorkerGroups(long projectCode) {
+ return mybatisMapper.queryAllTaskDefinitionWorkerGroups(projectCode);
+ }
}
diff --git
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/WorkerGroupDaoImpl.java
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/WorkerGroupDaoImpl.java
index 2151a6c216..b5203bb661 100644
---
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/WorkerGroupDaoImpl.java
+++
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/WorkerGroupDaoImpl.java
@@ -22,6 +22,9 @@ import
org.apache.dolphinscheduler.dao.mapper.WorkerGroupMapper;
import org.apache.dolphinscheduler.dao.repository.BaseDao;
import org.apache.dolphinscheduler.dao.repository.WorkerGroupDao;
+import java.util.List;
+import java.util.stream.Collectors;
+
import lombok.NonNull;
import org.springframework.stereotype.Repository;
@@ -33,4 +36,26 @@ public class WorkerGroupDaoImpl extends BaseDao<WorkerGroup,
WorkerGroupMapper>
super(workerGroupMapper);
}
+ @Override
+ public boolean deleteByWorkerGroupName(String workerGroupName) {
+ int deleted = mybatisMapper.deleteByWorkerGroupName(workerGroupName);
+ return deleted > 0;
+ }
+
+ @Override
+ public List<String> queryAllWorkerGroupNames() {
+ return mybatisMapper.queryAllWorkerGroup().stream()
+ .map(WorkerGroup::getName)
+ .collect(Collectors.toList());
+ }
+
+ @Override
+ public List<WorkerGroup> queryAllWorkerGroup() {
+ return mybatisMapper.queryAllWorkerGroup();
+ }
+
+ @Override
+ public List<WorkerGroup> queryWorkerGroupByName(String name) {
+ return mybatisMapper.queryWorkerGroupByName(name);
+ }
}
diff --git
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/WorkerGroupUtils.java
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/WorkerGroupUtils.java
index 7535ddb809..1707425940 100644
---
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/WorkerGroupUtils.java
+++
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/WorkerGroupUtils.java
@@ -17,6 +17,7 @@
package org.apache.dolphinscheduler.dao.utils;
+import org.apache.dolphinscheduler.common.constants.Constants;
import org.apache.dolphinscheduler.dao.entity.WorkerGroup;
import org.apache.commons.lang3.StringUtils;
@@ -54,7 +55,7 @@ public class WorkerGroupUtils {
if (StringUtils.isEmpty(addrList)) {
return Collections.emptyList();
}
- return Lists.newArrayList(addrList.split(","));
+ return Lists.newArrayList(addrList.split(Constants.COMMA));
}
}
diff --git
a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProjectWorkerGroupMapper.xml
b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProjectWorkerGroupMapper.xml
new file mode 100644
index 0000000000..8dcea4ceaa
--- /dev/null
+++
b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProjectWorkerGroupMapper.xml
@@ -0,0 +1,59 @@
+<?xml version="1.0" encoding="UTF-8" ?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+
+<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
"http://mybatis.org/dtd/mybatis-3-mapper.dtd" >
+<mapper
namespace="org.apache.dolphinscheduler.dao.mapper.ProjectWorkerGroupMapper">
+ <delete id="deleteByProjectCode">
+ delete from t_ds_relation_project_worker_group
+ where 1=1
+ and project_code = #{projectCode}
+ </delete>
+
+ <select id="queryAssignedWorkerGroupNamesByProjectCode"
resultType="java.lang.String">
+ select
+ worker_group
+ from
+ t_ds_relation_project_worker_group
+ where 1=1
+ and project_code = #{projectCode}
+ group by worker_group
+ </select>
+
+ <delete id="deleteByProjectCodeAndWorkerGroups">
+ delete from t_ds_relation_project_worker_group
+ where 1=1
+ and project_code = #{projectCode}
+ and worker_group in
+ <foreach collection="workerGroups" item="workerGroup" separator=","
open="(" close=")">
+ #{workerGroup}
+ </foreach>
+ </delete>
+
+ <select id="queryByProjectCode"
resultType="org.apache.dolphinscheduler.dao.entity.ProjectWorkerGroup">
+ select
+ id
+ ,project_code
+ ,worker_group
+ ,create_time
+ ,update_time
+ from
+ t_ds_relation_project_worker_group
+ where 1=1
+ and project_code = #{projectCode}
+ </select>
+</mapper>
diff --git
a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionMapper.xml
b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionMapper.xml
index 453246e687..fe154bd6c2 100644
---
a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionMapper.xml
+++
b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionMapper.xml
@@ -43,12 +43,12 @@
and td.code = ptr.post_task_code
limit 1
</select>
- <select id="queryAllDefinitionList"
resultType="org.apache.dolphinscheduler.dao.entity.TaskDefinition">
+ <select id="queryAllTaskDefinitionWorkerGroups"
resultType="java.lang.String">
select
- <include refid="baseSql"/>
+ worker_group
from t_ds_task_definition
where project_code = #{projectCode}
- order by create_time desc
+ group by worker_group
</select>
<select id="countDefinitionGroupByUser"
resultType="org.apache.dolphinscheduler.dao.model.WorkflowDefinitionCountDto">
diff --git
a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/WorkerGroupMapper.xml
b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/WorkerGroupMapper.xml
index 79305d1467..7dd29edd0b 100644
---
a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/WorkerGroupMapper.xml
+++
b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/WorkerGroupMapper.xml
@@ -29,4 +29,14 @@
where name = #{name}
</select>
+ <update id="updateAddrListByWorkerGroupName">
+ update t_ds_worker_group
+ set addr_list = #{addrList}, source = #{source}
+ where name = #{name}
+ </update>
+
+ <delete id="deleteByWorkerGroupName">
+ delete from t_ds_worker_group
+ where name = #{name}
+ </delete>
</mapper>
diff --git
a/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_mysql.sql
b/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_mysql.sql
index 7826be4846..eee04391e6 100644
--- a/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_mysql.sql
+++ b/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_mysql.sql
@@ -492,7 +492,8 @@ CREATE TABLE `t_ds_task_definition` (
`memory_max` int(11) DEFAULT '-1' NOT NULL COMMENT 'MemoryMax(MB):
-1:Infinity',
`create_time` datetime NOT NULL COMMENT 'create time',
`update_time` datetime NOT NULL COMMENT 'update time',
- PRIMARY KEY (`id`,`code`)
+ PRIMARY KEY (`id`,`code`),
+ KEY `idx_project_code` (`project_code`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8 COLLATE = utf8_bin;
-- ----------------------------
diff --git
a/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_postgresql.sql
b/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_postgresql.sql
index e31ba9184d..9ff1ab5b6d 100644
---
a/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_postgresql.sql
+++
b/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_postgresql.sql
@@ -925,7 +925,7 @@ CREATE TABLE t_ds_worker_group (
addr_list text DEFAULT NULL ,
create_time timestamp DEFAULT NULL ,
update_time timestamp DEFAULT NULL ,
- description text DEFAULT NULL,
+ description text DEFAULT NULL,
PRIMARY KEY (id) ,
CONSTRAINT name_unique UNIQUE (name)
) ;
diff --git
a/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.3.0_schema/mysql/dolphinscheduler_ddl.sql
b/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.3.0_schema/mysql/dolphinscheduler_ddl.sql
index 935ece7ca8..a57326e680 100644
---
a/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.3.0_schema/mysql/dolphinscheduler_ddl.sql
+++
b/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.3.0_schema/mysql/dolphinscheduler_ddl.sql
@@ -243,4 +243,6 @@ END;
d//
delimiter ;
CALL drop_column_t_ds_worker_group_other_params_json;
-DROP PROCEDURE drop_column_t_ds_worker_group_other_params_json;
\ No newline at end of file
+DROP PROCEDURE drop_column_t_ds_worker_group_other_params_json;
+
+ALTER TABLE `t_ds_task_definition` ADD INDEX `idx_project_code` USING BTREE
(`project_code`);
diff --git
a/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.3.0_schema/postgresql/dolphinscheduler_ddl.sql
b/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.3.0_schema/postgresql/dolphinscheduler_ddl.sql
index 9b9812033a..14e07d5298 100644
---
a/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.3.0_schema/postgresql/dolphinscheduler_ddl.sql
+++
b/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.3.0_schema/postgresql/dolphinscheduler_ddl.sql
@@ -246,4 +246,5 @@ $$ LANGUAGE plpgsql;
d//
select drop_column_t_ds_worker_group_other_params_json();
-DROP FUNCTION IF EXISTS drop_column_t_ds_worker_group_other_params_json();
\ No newline at end of file
+DROP FUNCTION IF EXISTS drop_column_t_ds_worker_group_other_params_json();
+
diff --git
a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionMapperTest.java
b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionMapperTest.java
index 7533b44a22..da3cc1d274 100644
---
a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionMapperTest.java
+++
b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionMapperTest.java
@@ -107,10 +107,10 @@ public class TaskDefinitionMapperTest extends BaseDaoTest
{
}
@Test
- public void testQueryAllDefinitionList() {
+ public void testQueryAllTaskDefinitionWorkerGroups() {
TaskDefinition taskDefinition = insertOne();
- List<TaskDefinition> taskDefinitions =
-
taskDefinitionMapper.queryAllDefinitionList(taskDefinition.getProjectCode());
+ List<String> taskDefinitions =
+
taskDefinitionMapper.queryAllTaskDefinitionWorkerGroups(taskDefinition.getProjectCode());
Assertions.assertNotEquals(0, taskDefinitions.size());
}
diff --git
a/dolphinscheduler-e2e/dolphinscheduler-e2e-case/src/test/java/org/apache/dolphinscheduler/e2e/cases/ProjectE2ETest.java
b/dolphinscheduler-e2e/dolphinscheduler-e2e-case/src/test/java/org/apache/dolphinscheduler/e2e/cases/ProjectE2ETest.java
index 2e2d1d7f23..18ecdb6768 100644
---
a/dolphinscheduler-e2e/dolphinscheduler-e2e-case/src/test/java/org/apache/dolphinscheduler/e2e/cases/ProjectE2ETest.java
+++
b/dolphinscheduler-e2e/dolphinscheduler-e2e-case/src/test/java/org/apache/dolphinscheduler/e2e/cases/ProjectE2ETest.java
@@ -23,6 +23,8 @@ import org.apache.dolphinscheduler.e2e.core.DolphinScheduler;
import org.apache.dolphinscheduler.e2e.pages.LoginPage;
import org.apache.dolphinscheduler.e2e.pages.project.ProjectPage;
+import java.util.UUID;
+
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Order;
import org.junit.jupiter.api.Test;
@@ -34,7 +36,9 @@ import org.testcontainers.shaded.org.awaitility.Awaitility;
@DisableIfTestFails
class ProjectE2ETest {
- private static final String project = "test-project-1";
+ private static final String project = "test-project-" + UUID.randomUUID();
+
+ private static final String workerGroup = "default";
private static RemoteWebDriver browser;
@@ -48,7 +52,23 @@ class ProjectE2ETest {
@Test
@Order(1)
void testCreateProject() {
- new ProjectPage(browser).create(project);
+ final ProjectPage page = new ProjectPage(browser);
+ page.create(project);
+
+ Awaitility.await().untilAsserted(() -> {
+ browser.navigate().refresh();
+ assertThat(
+ page.projectList()).anyMatch(
+ it -> it.getText().contains(project));
+ });
+ }
+
+ @Test
+ @Order(5)
+ void testAssignWorkerGroup() {
+ final ProjectPage page = new ProjectPage(browser);
+ page.assignWorkerGroup(project, workerGroup);
+ page.verifyAssignedWorkerGroup(project, workerGroup);
}
@Test
diff --git
a/dolphinscheduler-e2e/dolphinscheduler-e2e-case/src/test/java/org/apache/dolphinscheduler/e2e/pages/project/ProjectPage.java
b/dolphinscheduler-e2e/dolphinscheduler-e2e-case/src/test/java/org/apache/dolphinscheduler/e2e/pages/project/ProjectPage.java
index 33f42f92cc..02de0daef1 100644
---
a/dolphinscheduler-e2e/dolphinscheduler-e2e-case/src/test/java/org/apache/dolphinscheduler/e2e/pages/project/ProjectPage.java
+++
b/dolphinscheduler-e2e/dolphinscheduler-e2e-case/src/test/java/org/apache/dolphinscheduler/e2e/pages/project/ProjectPage.java
@@ -28,7 +28,6 @@ import java.util.List;
import lombok.Getter;
import org.openqa.selenium.By;
-import org.openqa.selenium.JavascriptExecutor;
import org.openqa.selenium.WebElement;
import org.openqa.selenium.remote.RemoteWebDriver;
import org.openqa.selenium.support.FindBy;
@@ -52,10 +51,13 @@ public final class ProjectPage extends NavBarPage
implements NavBarItem {
private final CreateProjectForm createProjectForm;
+ private final AssignWorkerGroupForm assignWorkerGroupForm;
+
public ProjectPage(RemoteWebDriver driver) {
super(driver);
this.createProjectForm = new CreateProjectForm();
+ this.assignWorkerGroupForm = new AssignWorkerGroupForm();
PageFactory.initElements(driver, this);
}
@@ -69,6 +71,7 @@ public final class ProjectPage extends NavBarPage implements
NavBarItem {
public ProjectPage createProjectUntilSuccess(String project) {
create(project);
+ assignWorkerGroup(project, "default");
await().untilAsserted(() -> assertThat(projectList())
.as("project list should contain newly-created project")
.anyMatch(it -> it.getText().contains(project)));
@@ -83,7 +86,47 @@ public final class ProjectPage extends NavBarPage implements
NavBarItem {
.orElseThrow(() -> new RuntimeException("Cannot find project:
" + project))
.findElement(By.className("delete")).click();
- ((JavascriptExecutor) driver).executeScript("arguments[0].click();",
buttonConfirm());
+ driver.executeScript("arguments[0].click();", buttonConfirm());
+
+ return this;
+ }
+
+ public ProjectPage assignWorkerGroup(String project, String workerGroup) {
+ projectList()
+ .stream()
+ .filter(it -> it.getText().contains(project))
+ .findFirst()
+ .orElseThrow(() -> new RuntimeException("Can not find project:
" + project))
+ .findElement(By.className("assign-worker-group-btn")).click();
+
+ assignWorkerGroupForm.sourceWorkerGroups()
+ .stream()
+ .filter(it -> it.getText().contains(workerGroup))
+ .findFirst()
+ .orElseThrow(() -> new RuntimeException("Can not find source
worker group: " + workerGroup))
+ .click();
+
+ assignWorkerGroupForm.buttonSubmit().click();
+
+ return this;
+ }
+
+ public ProjectPage verifyAssignedWorkerGroup(String project, String
workerGroup) {
+ projectList()
+ .stream()
+ .filter(it -> it.getText().contains(project))
+ .findFirst()
+ .orElseThrow(() -> new RuntimeException("Can not find project:
" + project))
+ .findElement(By.className("assign-worker-group-btn")).click();
+
+ assignWorkerGroupForm.targetWorkerGroups()
+ .stream()
+ .filter(it -> it.getText().contains(workerGroup))
+ .findFirst()
+ .orElseThrow(() -> new RuntimeException("Can not find target
worker group: " + workerGroup))
+ .click();
+
+ assignWorkerGroupForm.buttonCancel().click();
return this;
}
@@ -115,4 +158,38 @@ public final class ProjectPage extends NavBarPage
implements NavBarItem {
@FindBy(className = "btn-submit")
private WebElement buttonSubmit;
}
+
+ @Getter
+ public class AssignWorkerGroupForm {
+
+ AssignWorkerGroupForm() {
+ PageFactory.initElements(driver, this);
+ }
+
+ @FindBys({
+ @FindBy(className = "assign-worker-group-modal"),
+ @FindBy(className = "n-transfer-list--source"),
+ @FindBy(className = "n-transfer-list-item__label")
+ })
+ private List<WebElement> sourceWorkerGroups;
+
+ @FindBys({
+ @FindBy(className = "assign-worker-group-modal"),
+ @FindBy(className = "n-transfer-list--target"),
+ @FindBy(className = "n-transfer-list-item__label")
+ })
+ private List<WebElement> targetWorkerGroups;
+
+ @FindBys({
+ @FindBy(className = "assign-worker-group-modal"),
+ @FindBy(className = "btn-submit"),
+ })
+ private WebElement buttonSubmit;
+
+ @FindBys({
+ @FindBy(className = "assign-worker-group-modal"),
+ @FindBy(className = "btn-cancel"),
+ })
+ private WebElement buttonCancel;
+ }
}
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cluster/ClusterManager.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cluster/ClusterManager.java
index bb49133351..3f875f7047 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cluster/ClusterManager.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cluster/ClusterManager.java
@@ -31,10 +31,10 @@ import org.springframework.stereotype.Component;
public class ClusterManager {
@Getter
- private final MasterClusters masterClusters;
+ private MasterClusters masterClusters;
@Getter
- private final WorkerClusters workerClusters;
+ private WorkerClusters workerClusters;
@Autowired
private WorkerGroupChangeNotifier workerGroupChangeNotifier;
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cluster/MasterServerMetadata.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cluster/MasterServerMetadata.java
index 1005de4c50..c49833d5da 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cluster/MasterServerMetadata.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cluster/MasterServerMetadata.java
@@ -17,6 +17,7 @@
package org.apache.dolphinscheduler.server.master.cluster;
+import org.apache.dolphinscheduler.common.constants.Constants;
import org.apache.dolphinscheduler.common.model.MasterHeartBeat;
import lombok.Data;
@@ -33,7 +34,7 @@ public class MasterServerMetadata extends BaseServerMetadata
implements Comparab
public static MasterServerMetadata parseFromHeartBeat(final
MasterHeartBeat masterHeartBeat) {
return MasterServerMetadata.builder()
.serverStartupTime(masterHeartBeat.getStartupTime())
- .address(masterHeartBeat.getHost() + ":" +
masterHeartBeat.getPort())
+ .address(masterHeartBeat.getHost() + Constants.COLON +
masterHeartBeat.getPort())
.cpuUsage(masterHeartBeat.getCpuUsage())
.memoryUsage(masterHeartBeat.getMemoryUsage())
.serverStatus(masterHeartBeat.getServerStatus())
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cluster/WorkerClusters.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cluster/WorkerClusters.java
index 291b2f1758..509c54f3fb 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cluster/WorkerClusters.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cluster/WorkerClusters.java
@@ -89,8 +89,10 @@ public class WorkerClusters extends
AbstractClusterSubscribeListener<WorkerServe
@Override
public void onWorkerGroupDelete(List<WorkerGroup> workerGroups) {
- for (WorkerGroup workerGroup : workerGroups) {
- workerGroupMapping.remove(workerGroup.getName());
+ synchronized (workerGroupMapping) {
+ for (WorkerGroup workerGroup : workerGroups) {
+ workerGroupMapping.remove(workerGroup.getName());
+ }
}
}
@@ -110,7 +112,9 @@ public class WorkerClusters extends
AbstractClusterSubscribeListener<WorkerServe
.filter(Objects::nonNull)
.map(WorkerServerMetadata::getAddress)
.collect(Collectors.toList());
- workerGroupMapping.put(workerGroup.getName(), activeWorkers);
+ synchronized (workerGroupMapping) {
+ workerGroupMapping.put(workerGroup.getName(), activeWorkers);
+ }
}
}
@@ -126,6 +130,17 @@ public class WorkerClusters extends
AbstractClusterSubscribeListener<WorkerServe
@Override
public void onServerAdded(WorkerServerMetadata workerServer) {
workerMapping.put(workerServer.getAddress(), workerServer);
+ synchronized (workerGroupMapping) {
+ List<String> addWorkerGroupAddrList =
workerGroupMapping.get(workerServer.getWorkerGroup());
+ if (addWorkerGroupAddrList == null) {
+ List<String> newWorkerGroupAddrList = new ArrayList<>();
+ newWorkerGroupAddrList.add(workerServer.getAddress());
+ workerGroupMapping.put(workerServer.getWorkerGroup(),
newWorkerGroupAddrList);
+ } else if
(!addWorkerGroupAddrList.contains(workerServer.getAddress())) {
+ addWorkerGroupAddrList.add(workerServer.getAddress());
+ workerGroupMapping.put(workerServer.getWorkerGroup(),
addWorkerGroupAddrList);
+ }
+ }
for (IClustersChangeListener<WorkerServerMetadata> listener :
workerClusterChangeListeners) {
listener.onServerAdded(workerServer);
}
@@ -134,6 +149,15 @@ public class WorkerClusters extends
AbstractClusterSubscribeListener<WorkerServe
@Override
public void onServerRemove(WorkerServerMetadata workerServer) {
workerMapping.remove(workerServer.getAddress(), workerServer);
+ synchronized (workerGroupMapping) {
+ List<String> removeWorkerGroupAddrList =
workerGroupMapping.get(workerServer.getWorkerGroup());
+ if (removeWorkerGroupAddrList != null &&
removeWorkerGroupAddrList.contains(workerServer.getAddress())) {
+ removeWorkerGroupAddrList.remove(workerServer.getAddress());
+ if (removeWorkerGroupAddrList.isEmpty()) {
+ workerGroupMapping.remove(workerServer.getWorkerGroup());
+ }
+ }
+ }
for (IClustersChangeListener<WorkerServerMetadata> listener :
workerClusterChangeListeners) {
listener.onServerRemove(workerServer);
}
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cluster/WorkerGroupChangeNotifier.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cluster/WorkerGroupChangeNotifier.java
index 827ebfc576..cb1eb36bcc 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cluster/WorkerGroupChangeNotifier.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cluster/WorkerGroupChangeNotifier.java
@@ -85,7 +85,7 @@ public class WorkerGroupChangeNotifier {
}
private MapComparator<String, WorkerGroup> detectChangedWorkerGroups() {
- final Map<String, WorkerGroup> tmpWorkerGroupMap =
workerGroupDao.queryAll()
+ Map<String, WorkerGroup> tmpWorkerGroupMap = workerGroupDao.queryAll()
.stream()
.collect(Collectors.toMap(WorkerGroup::getName, workerGroup ->
workerGroup));
return new MapComparator<>(workerGroupMap, tmpWorkerGroupMap);
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cluster/WorkerServerMetadata.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cluster/WorkerServerMetadata.java
index de9a03506d..d853c7d061 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cluster/WorkerServerMetadata.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cluster/WorkerServerMetadata.java
@@ -17,6 +17,7 @@
package org.apache.dolphinscheduler.server.master.cluster;
+import org.apache.dolphinscheduler.common.constants.Constants;
import org.apache.dolphinscheduler.common.model.WorkerHeartBeat;
import lombok.Builder;
@@ -41,7 +42,8 @@ public class WorkerServerMetadata extends BaseServerMetadata {
public static WorkerServerMetadata parseFromHeartBeat(final
WorkerHeartBeat workerHeartBeat) {
return WorkerServerMetadata.builder()
.serverStartupTime(workerHeartBeat.getStartupTime())
- .address(workerHeartBeat.getHost() + ":" +
workerHeartBeat.getPort())
+ .address(workerHeartBeat.getHost() + Constants.COLON +
workerHeartBeat.getPort())
+ .workerGroup(workerHeartBeat.getWorkerGroup())
.cpuUsage(workerHeartBeat.getCpuUsage())
.memoryUsage(workerHeartBeat.getMemoryUsage())
.serverStatus(workerHeartBeat.getServerStatus())
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/client/PhysicalTaskExecutorClientDelegator.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/client/PhysicalTaskExecutorClientDelegator.java
index f546033436..f7566d3133 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/client/PhysicalTaskExecutorClientDelegator.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/client/PhysicalTaskExecutorClientDelegator.java
@@ -64,8 +64,9 @@ public class PhysicalTaskExecutorClientDelegator implements
ITaskExecutorClientD
.map(Host::of)
.map(Host::getAddress)
.orElseThrow(() -> new TaskDispatchException(
- String.format("Cannot find the host to dispatch
Task[id=%s, name=%s]",
- taskExecutionContext.getTaskInstanceId(),
taskName)));
+ String.format("Cannot find the host to dispatch
Task[id=%s, name=%s, workerGroup=%s]",
+ taskExecutionContext.getTaskInstanceId(),
taskName,
+ taskExecutionContext.getWorkerGroup())));
taskExecutionContext.setHost(physicalTaskExecutorAddress);
taskExecutionRunnable.getTaskInstance().setHost(physicalTaskExecutorAddress);
diff --git
a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/cluster/WorkerServerMetadataTest.java
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/cluster/WorkerServerMetadataTest.java
index 16e30b9cd8..e13555b09d 100644
---
a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/cluster/WorkerServerMetadataTest.java
+++
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/cluster/WorkerServerMetadataTest.java
@@ -42,6 +42,7 @@ class WorkerServerMetadataTest {
.port(12345)
.workerHostWeight(2)
.threadPoolUsage(0.6)
+ .workerGroup("test")
.build();
WorkerServerMetadata workerServerMetadata =
WorkerServerMetadata.parseFromHeartBeat(workerHeartBeat);
Truth.assertThat(workerServerMetadata.getCpuUsage()).isEqualTo(0.2);
@@ -50,5 +51,6 @@ class WorkerServerMetadataTest {
Truth.assertThat(workerServerMetadata.getAddress()).isEqualTo("localhost:12345");
Truth.assertThat(workerServerMetadata.getWorkerWeight()).isEqualTo(2);
Truth.assertThat(workerServerMetadata.getTaskThreadPoolUsage()).isEqualTo(0.6);
+
Truth.assertThat(workerServerMetadata.getWorkerGroup()).isEqualTo("test");
}
}
diff --git a/dolphinscheduler-master/src/test/resources/application.yaml
b/dolphinscheduler-master/src/test/resources/application.yaml
index eea966c659..8fdf521cde 100644
--- a/dolphinscheduler-master/src/test/resources/application.yaml
+++ b/dolphinscheduler-master/src/test/resources/application.yaml
@@ -52,6 +52,7 @@ master:
# The number of threads used to execute logic task.
task-executor-thread-count: 4
max-heartbeat-interval: 10s
+ group: default
server-load-protection:
# If set true, will open master overload protection
enabled: true
diff --git
a/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/RegistryClient.java
b/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/RegistryClient.java
index df0587019a..75b817c2d9 100644
---
a/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/RegistryClient.java
+++
b/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/RegistryClient.java
@@ -125,9 +125,9 @@ public class RegistryClient {
log.warn("unknown registry node type: {}",
registryNodeType);
}
- server.setResInfo(heartBeatJson);
+ server.setHeartBeatInfo(heartBeatJson);
// todo: add host, port in heartBeat Info, so that we don't need
to parse this again
- server.setZkDirectory(registryNodeType.getRegistryPath() + "/" +
serverPath);
+ server.setServerDirectory(registryNodeType.getRegistryPath() + "/"
+ serverPath);
serverList.add(server);
}
return serverList;
diff --git a/dolphinscheduler-standalone-server/pom.xml
b/dolphinscheduler-standalone-server/pom.xml
index 97f82dd3da..b98a7e5ea8 100644
--- a/dolphinscheduler-standalone-server/pom.xml
+++ b/dolphinscheduler-standalone-server/pom.xml
@@ -54,12 +54,19 @@
<artifactId>dolphinscheduler-api</artifactId>
<scope>provided</scope>
</dependency>
+
<dependency>
<groupId>org.apache.dolphinscheduler</groupId>
<artifactId>dolphinscheduler-alert-server</artifactId>
<scope>provided</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.dolphinscheduler</groupId>
+ <artifactId>dolphinscheduler-alert-all</artifactId>
+ <scope>provided</scope>
+ </dependency>
+
<dependency>
<groupId>org.apache.dolphinscheduler</groupId>
<artifactId>dolphinscheduler-task-all</artifactId>
diff --git
a/dolphinscheduler-standalone-server/src/main/resources/application.yaml
b/dolphinscheduler-standalone-server/src/main/resources/application.yaml
index 73a307cffb..31896727df 100644
--- a/dolphinscheduler-standalone-server/src/main/resources/application.yaml
+++ b/dolphinscheduler-standalone-server/src/main/resources/application.yaml
@@ -194,6 +194,8 @@ worker:
max-heartbeat-interval: 10s
# worker host weight to dispatch tasks, default value 100
host-weight: 100
+ # worker group name
+ group: default
server-load-protection:
enabled: true
# Worker max system cpu usage, when the worker's system cpu usage is
smaller then this value, worker server can be dispatched tasks.
diff --git a/dolphinscheduler-ui/src/locales/en_US/security.ts
b/dolphinscheduler-ui/src/locales/en_US/security.ts
index a79211bec7..15e1c1fc2c 100644
--- a/dolphinscheduler-ui/src/locales/en_US/security.ts
+++ b/dolphinscheduler-ui/src/locales/en_US/security.ts
@@ -64,6 +64,7 @@ export default {
group_name_tips: 'Please enter your group name',
worker_addresses: 'Worker Addresses',
worker_addresses_tips: 'Please select worker addresses',
+ source: 'Source',
create_time: 'Create Time',
update_time: 'Update Time'
},
diff --git a/dolphinscheduler-ui/src/locales/zh_CN/security.ts
b/dolphinscheduler-ui/src/locales/zh_CN/security.ts
index 03a4ce7051..fc0a00f5e1 100644
--- a/dolphinscheduler-ui/src/locales/zh_CN/security.ts
+++ b/dolphinscheduler-ui/src/locales/zh_CN/security.ts
@@ -64,6 +64,7 @@ export default {
group_name_tips: '请输入分组名称',
worker_addresses: 'Worker地址',
worker_addresses_tips: '请选择Worker地址',
+ source: '来源',
create_time: '创建时间',
update_time: '更新时间'
},
diff --git
a/dolphinscheduler-ui/src/views/projects/list/components/use-worker-group.ts
b/dolphinscheduler-ui/src/views/projects/list/components/use-worker-group.ts
index 2144bce057..8257b3a3c6 100644
--- a/dolphinscheduler-ui/src/views/projects/list/components/use-worker-group.ts
+++ b/dolphinscheduler-ui/src/views/projects/list/components/use-worker-group.ts
@@ -44,8 +44,7 @@ export function useWorkerGroup(
for (const workerGroup of res) {
variables.model.workerGroupOptions.push({
label: workerGroup,
- value: workerGroup,
- disabled: workerGroup === 'default'
+ value: workerGroup
})
}
})
@@ -63,10 +62,8 @@ export function useWorkerGroup(
initOptions()
const handleValidate = () => {
- if (variables.model?.assignedWorkerGroups.length > 0) {
- submitModal()
- ctx.emit('confirmModal', props.showModalRef)
- }
+ submitModal()
+ ctx.emit('confirmModal', props.showModalRef)
}
const submitModal = async () => {
diff --git
a/dolphinscheduler-ui/src/views/projects/list/components/worker-group-modal.tsx
b/dolphinscheduler-ui/src/views/projects/list/components/worker-group-modal.tsx
index 47b303611b..1218b00f2e 100644
---
a/dolphinscheduler-ui/src/views/projects/list/components/worker-group-modal.tsx
+++
b/dolphinscheduler-ui/src/views/projects/list/components/worker-group-modal.tsx
@@ -72,6 +72,7 @@ const WorkerGroupModal = defineComponent({
return (
<Modal
title={t('project.list.assign_worker_group')}
+ class={'assign-worker-group-modal'}
show={this.showModalRef}
onConfirm={this.confirmModal}
onCancel={this.cancelModal}
diff --git a/dolphinscheduler-ui/src/views/projects/list/index.tsx
b/dolphinscheduler-ui/src/views/projects/list/index.tsx
index 8971eccea0..1ad4bb2edb 100644
--- a/dolphinscheduler-ui/src/views/projects/list/index.tsx
+++ b/dolphinscheduler-ui/src/views/projects/list/index.tsx
@@ -156,7 +156,6 @@ const list = defineComponent({
<NPagination
v-model:page={this.page}
v-model:page-size={this.pageSize}
- page-count={this.totalPage}
show-size-picker
page-sizes={[10, 30, 50]}
show-quick-jumper
diff --git a/dolphinscheduler-ui/src/views/projects/list/use-table.ts
b/dolphinscheduler-ui/src/views/projects/list/use-table.ts
index 0ef367aa6e..8516753e80 100644
--- a/dolphinscheduler-ui/src/views/projects/list/use-table.ts
+++ b/dolphinscheduler-ui/src/views/projects/list/use-table.ts
@@ -182,7 +182,7 @@ export function useTable() {
circle: true,
type: 'info',
size: 'small',
- class: 'edit',
+ class: 'assign-worker-group-btn',
onClick: () => {
handleAssign(row)
}
diff --git
a/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-worker-group.ts
b/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-worker-group.ts
index 9b452b541a..4016844113 100644
---
a/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-worker-group.ts
+++
b/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-worker-group.ts
@@ -55,6 +55,6 @@ export function useWorkerGroup(projectCode: number):
IJsonItem {
required: true,
message: t('project.node.worker_group_tips')
},
- value: 'default'
+ value: options.value.length > 0 ? options.value[0].value : ''
}
}
diff --git
a/dolphinscheduler-ui/src/views/projects/task/instance/batch-task.tsx
b/dolphinscheduler-ui/src/views/projects/task/instance/batch-task.tsx
index 36d7f1ce96..8ee924fe23 100644
--- a/dolphinscheduler-ui/src/views/projects/task/instance/batch-task.tsx
+++ b/dolphinscheduler-ui/src/views/projects/task/instance/batch-task.tsx
@@ -294,7 +294,6 @@ const BatchTaskInstance = defineComponent({
<NPagination
v-model:page={this.page}
v-model:page-size={this.pageSize}
- page-count={this.totalPage}
show-size-picker
page-sizes={[10, 30, 50]}
show-quick-jumper
diff --git
a/dolphinscheduler-ui/src/views/projects/task/instance/stream-task.tsx
b/dolphinscheduler-ui/src/views/projects/task/instance/stream-task.tsx
index 2fac00df67..9441185259 100644
--- a/dolphinscheduler-ui/src/views/projects/task/instance/stream-task.tsx
+++ b/dolphinscheduler-ui/src/views/projects/task/instance/stream-task.tsx
@@ -266,7 +266,6 @@ const BatchTaskInstance = defineComponent({
<NPagination
v-model:page={this.page}
v-model:page-size={this.pageSize}
- page-count={this.totalPage}
show-size-picker
page-sizes={[10, 30, 50]}
show-quick-jumper
diff --git
a/dolphinscheduler-ui/src/views/projects/workflow/definition/index.tsx
b/dolphinscheduler-ui/src/views/projects/workflow/definition/index.tsx
index 33d0ced9fd..29d9a8ef6a 100644
--- a/dolphinscheduler-ui/src/views/projects/workflow/definition/index.tsx
+++ b/dolphinscheduler-ui/src/views/projects/workflow/definition/index.tsx
@@ -274,7 +274,6 @@ export default defineComponent({
<NPagination
v-model:page={this.page}
v-model:page-size={this.pageSize}
- page-count={this.totalPage}
show-size-picker
page-sizes={[10, 30, 50]}
show-quick-jumper
diff --git a/dolphinscheduler-ui/src/views/projects/workflow/instance/index.tsx
b/dolphinscheduler-ui/src/views/projects/workflow/instance/index.tsx
index b628cad59f..c4de57aaba 100644
--- a/dolphinscheduler-ui/src/views/projects/workflow/instance/index.tsx
+++ b/dolphinscheduler-ui/src/views/projects/workflow/instance/index.tsx
@@ -116,7 +116,6 @@ export default defineComponent({
<NPagination
v-model:page={this.page}
v-model:page-size={this.pageSize}
- page-count={this.totalPage}
show-size-picker
page-sizes={[10, 30, 50]}
show-quick-jumper
diff --git
a/dolphinscheduler-ui/src/views/security/worker-group-manage/use-table.ts
b/dolphinscheduler-ui/src/views/security/worker-group-manage/use-table.ts
index 796fc03c7c..3213d76fc2 100644
--- a/dolphinscheduler-ui/src/views/security/worker-group-manage/use-table.ts
+++ b/dolphinscheduler-ui/src/views/security/worker-group-manage/use-table.ts
@@ -67,6 +67,11 @@ export function useTable() {
)
})
},
+ {
+ title: t('security.worker_group.source'),
+ key: 'source',
+ className: 'source'
+ },
{
title: t('security.worker_group.create_time'),
key: 'createTime'
diff --git
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java
index b092199c71..c92caa79c0 100644
---
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java
+++
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java
@@ -44,6 +44,7 @@ public class WorkerConfig implements Validator {
private Duration maxHeartbeatInterval = Duration.ofSeconds(10);
private int hostWeight = 100;
private WorkerServerLoadProtection serverLoadProtection = new
WorkerServerLoadProtection();
+ private String group;
/**
* This field doesn't need to set at config file, it will be calculated by
workerIp:listenPort
diff --git
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/task/WorkerHeartBeatTask.java
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/task/WorkerHeartBeatTask.java
index f5bce3b766..739cf95e22 100644
---
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/task/WorkerHeartBeatTask.java
+++
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/task/WorkerHeartBeatTask.java
@@ -79,6 +79,7 @@ public class WorkerHeartBeatTask extends
BaseHeartBeatTask<WorkerHeartBeat> {
.serverStatus(serverStatus)
.host(NetUtils.getHost())
.port(workerConfig.getListenPort())
+ .workerGroup(workerConfig.getGroup())
.build();
}
diff --git a/dolphinscheduler-worker/src/main/resources/application.yaml
b/dolphinscheduler-worker/src/main/resources/application.yaml
index d9da1ec244..f463eff337 100644
--- a/dolphinscheduler-worker/src/main/resources/application.yaml
+++ b/dolphinscheduler-worker/src/main/resources/application.yaml
@@ -46,6 +46,8 @@ worker:
max-heartbeat-interval: 10s
# worker host weight to dispatch tasks, default value 100
host-weight: 100
+ # worker group name
+ group: default
server-load-protection:
# If set true, will open worker overload protection
enabled: true