This is an automated email from the ASF dual-hosted git repository.
gallardot pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new 566651c6c5 [Improvement-16880] Merge worker group from config and ui
and distinct display it in api (#16883)
566651c6c5 is described below
commit 566651c6c58672deaf438d29fcf2f644c67a9160
Author: xiangzihao <[email protected]>
AuthorDate: Mon Dec 9 13:27:24 2024 +0800
[Improvement-16880] Merge worker group from config and ui and distinct
display it in api (#16883)
---
.../ProjectWorkerGroupRelationServiceImpl.java | 2 +-
.../api/service/impl/WorkerGroupServiceImpl.java | 3 +-
.../server/master/cluster/WorkerClusters.java | 56 +++++++++++++++-------
.../server/master/cluster/WorkerClustersTest.java | 16 +++----
.../src/main/resources/application.yaml | 2 +-
.../server/worker/config/WorkerConfig.java | 6 +++
6 files changed, 56 insertions(+), 29 deletions(-)
diff --git
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProjectWorkerGroupRelationServiceImpl.java
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProjectWorkerGroupRelationServiceImpl.java
index 3e6e4617f5..94c6eec53b 100644
---
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProjectWorkerGroupRelationServiceImpl.java
+++
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProjectWorkerGroupRelationServiceImpl.java
@@ -187,7 +187,7 @@ public class ProjectWorkerGroupRelationServiceImpl extends
BaseServiceImpl
projectWorkerGroup.setProjectCode(projectCode);
projectWorkerGroup.setWorkerGroup(workerGroup);
return projectWorkerGroup;
- }).collect(Collectors.toList());
+ }).distinct().collect(Collectors.toList());
result.put(Constants.DATA_LIST, projectWorkerGroups);
putMsg(result, Status.SUCCESS);
diff --git
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkerGroupServiceImpl.java
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkerGroupServiceImpl.java
index 52db3bb7b0..eefe307670 100644
---
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkerGroupServiceImpl.java
+++
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkerGroupServiceImpl.java
@@ -303,7 +303,7 @@ public class WorkerGroupServiceImpl extends BaseServiceImpl
implements WorkerGro
.map(WorkerGroup::getName)
.collect(Collectors.toList());
availableWorkerGroupList.addAll(configWorkerGroupNames);
- result.put(Constants.DATA_LIST, availableWorkerGroupList);
+ result.put(Constants.DATA_LIST,
availableWorkerGroupList.stream().distinct().collect(Collectors.toList()));
putMsg(result, Status.SUCCESS);
return result;
}
@@ -362,6 +362,7 @@ public class WorkerGroupServiceImpl extends BaseServiceImpl
implements WorkerGro
}
workerGroupDao.deleteById(id);
+ boardCastToMasterThatWorkerGroupChanged();
log.info("Delete worker group complete, workerGroupName:{}.",
workerGroup.getName());
putMsg(result, Status.SUCCESS);
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cluster/WorkerClusters.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cluster/WorkerClusters.java
index 509c54f3fb..1716e78b07 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cluster/WorkerClusters.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cluster/WorkerClusters.java
@@ -43,8 +43,11 @@ public class WorkerClusters extends
AbstractClusterSubscribeListener<WorkerServe
// WorkerIdentifier(workerAddress) -> worker
private final Map<String, WorkerServerMetadata> workerMapping = new
ConcurrentHashMap<>();
- // WorkerGroup -> WorkerIdentifier(workerAddress)
- private final Map<String, List<String>> workerGroupMapping = new
ConcurrentHashMap<>();
+ // WorkerGroup from db -> WorkerIdentifier(workerAddress)
+ private final Map<String, List<String>> dbWorkerGroupMapping = new
ConcurrentHashMap<>();
+
+ // WorkerGroup from config -> WorkerIdentifier(workerAddress)
+ private final Map<String, List<String>> configWorkerGroupMapping = new
ConcurrentHashMap<>();
private final List<IClustersChangeListener<WorkerServerMetadata>>
workerClusterChangeListeners =
new CopyOnWriteArrayList<>();
@@ -59,27 +62,44 @@ public class WorkerClusters extends
AbstractClusterSubscribeListener<WorkerServe
return Optional.ofNullable(workerMapping.get(address));
}
- public List<String> getWorkerServerAddressByGroup(String workerGroup) {
+ public List<String> getDbWorkerServerAddressByGroup(String workerGroup) {
+ if (WorkerGroupUtils.getDefaultWorkerGroup().equals(workerGroup)) {
+ return UnmodifiableList.unmodifiableList(new
ArrayList<>(workerMapping.keySet()));
+ }
+ return dbWorkerGroupMapping.getOrDefault(workerGroup,
Collections.emptyList());
+ }
+
+ public List<String> getConfigWorkerServerAddressByGroup(String
workerGroup) {
if (WorkerGroupUtils.getDefaultWorkerGroup().equals(workerGroup)) {
return UnmodifiableList.unmodifiableList(new
ArrayList<>(workerMapping.keySet()));
}
- return workerGroupMapping.getOrDefault(workerGroup,
Collections.emptyList());
+ return configWorkerGroupMapping.getOrDefault(workerGroup,
Collections.emptyList());
}
public List<String> getNormalWorkerServerAddressByGroup(String
workerGroup) {
- List<String> normalWorkerAddresses =
getWorkerServerAddressByGroup(workerGroup)
+ List<String> dbWorkerAddresses =
getDbWorkerServerAddressByGroup(workerGroup)
+ .stream()
+ .map(workerMapping::get)
+ .filter(Objects::nonNull)
+ .filter(workerServer -> workerServer.getServerStatus() ==
ServerStatus.NORMAL)
+ .map(WorkerServerMetadata::getAddress)
+ .collect(Collectors.toList());
+ List<String> configWorkerAddresses =
getConfigWorkerServerAddressByGroup(workerGroup)
.stream()
.map(workerMapping::get)
.filter(Objects::nonNull)
.filter(workerServer -> workerServer.getServerStatus() ==
ServerStatus.NORMAL)
.map(WorkerServerMetadata::getAddress)
.collect(Collectors.toList());
- return UnmodifiableList.unmodifiableList(normalWorkerAddresses);
+ dbWorkerAddresses.removeAll(configWorkerAddresses);
+ dbWorkerAddresses.addAll(configWorkerAddresses);
+ return UnmodifiableList.unmodifiableList(dbWorkerAddresses);
}
public boolean containsWorkerGroup(String workerGroup) {
return WorkerGroupUtils.getDefaultWorkerGroup().equals(workerGroup)
- || workerGroupMapping.containsKey(workerGroup);
+ || dbWorkerGroupMapping.containsKey(workerGroup)
+ || configWorkerGroupMapping.containsKey(workerGroup);
}
@Override
@@ -89,9 +109,9 @@ public class WorkerClusters extends
AbstractClusterSubscribeListener<WorkerServe
@Override
public void onWorkerGroupDelete(List<WorkerGroup> workerGroups) {
- synchronized (workerGroupMapping) {
+ synchronized (dbWorkerGroupMapping) {
for (WorkerGroup workerGroup : workerGroups) {
- workerGroupMapping.remove(workerGroup.getName());
+ dbWorkerGroupMapping.remove(workerGroup.getName());
}
}
}
@@ -112,8 +132,8 @@ public class WorkerClusters extends
AbstractClusterSubscribeListener<WorkerServe
.filter(Objects::nonNull)
.map(WorkerServerMetadata::getAddress)
.collect(Collectors.toList());
- synchronized (workerGroupMapping) {
- workerGroupMapping.put(workerGroup.getName(), activeWorkers);
+ synchronized (dbWorkerGroupMapping) {
+ dbWorkerGroupMapping.put(workerGroup.getName(), activeWorkers);
}
}
}
@@ -130,15 +150,15 @@ public class WorkerClusters extends
AbstractClusterSubscribeListener<WorkerServe
@Override
public void onServerAdded(WorkerServerMetadata workerServer) {
workerMapping.put(workerServer.getAddress(), workerServer);
- synchronized (workerGroupMapping) {
- List<String> addWorkerGroupAddrList =
workerGroupMapping.get(workerServer.getWorkerGroup());
+ synchronized (configWorkerGroupMapping) {
+ List<String> addWorkerGroupAddrList =
configWorkerGroupMapping.get(workerServer.getWorkerGroup());
if (addWorkerGroupAddrList == null) {
List<String> newWorkerGroupAddrList = new ArrayList<>();
newWorkerGroupAddrList.add(workerServer.getAddress());
- workerGroupMapping.put(workerServer.getWorkerGroup(),
newWorkerGroupAddrList);
+ configWorkerGroupMapping.put(workerServer.getWorkerGroup(),
newWorkerGroupAddrList);
} else if
(!addWorkerGroupAddrList.contains(workerServer.getAddress())) {
addWorkerGroupAddrList.add(workerServer.getAddress());
- workerGroupMapping.put(workerServer.getWorkerGroup(),
addWorkerGroupAddrList);
+ configWorkerGroupMapping.put(workerServer.getWorkerGroup(),
addWorkerGroupAddrList);
}
}
for (IClustersChangeListener<WorkerServerMetadata> listener :
workerClusterChangeListeners) {
@@ -149,12 +169,12 @@ public class WorkerClusters extends
AbstractClusterSubscribeListener<WorkerServe
@Override
public void onServerRemove(WorkerServerMetadata workerServer) {
workerMapping.remove(workerServer.getAddress(), workerServer);
- synchronized (workerGroupMapping) {
- List<String> removeWorkerGroupAddrList =
workerGroupMapping.get(workerServer.getWorkerGroup());
+ synchronized (configWorkerGroupMapping) {
+ List<String> removeWorkerGroupAddrList =
configWorkerGroupMapping.get(workerServer.getWorkerGroup());
if (removeWorkerGroupAddrList != null &&
removeWorkerGroupAddrList.contains(workerServer.getAddress())) {
removeWorkerGroupAddrList.remove(workerServer.getAddress());
if (removeWorkerGroupAddrList.isEmpty()) {
- workerGroupMapping.remove(workerServer.getWorkerGroup());
+
configWorkerGroupMapping.remove(workerServer.getWorkerGroup());
}
}
}
diff --git
a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/cluster/WorkerClustersTest.java
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/cluster/WorkerClustersTest.java
index 045e493d57..0be794c497 100644
---
a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/cluster/WorkerClustersTest.java
+++
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/cluster/WorkerClustersTest.java
@@ -40,12 +40,12 @@ class WorkerClustersTest {
.addrList(normalWorkerServerMetadata.getAddress())
.build();
workerClusters.onWorkerGroupAdd(Lists.newArrayList(workerGroup));
-
assertThat(workerClusters.getWorkerServerAddressByGroup("flinkCluster"))
+
assertThat(workerClusters.getDbWorkerServerAddressByGroup("flinkCluster"))
.containsExactly(normalWorkerServerMetadata.getAddress());
workerClusters.onWorkerGroupDelete(Lists.newArrayList(workerGroup));
Truth.assertThat(workerClusters.containsWorkerGroup("flinkCluster")).isFalse();
-
assertThat(workerClusters.getWorkerServerAddressByGroup("flinkCluster")).isEmpty();
+
assertThat(workerClusters.getDbWorkerServerAddressByGroup("flinkCluster")).isEmpty();
}
@Test
@@ -59,7 +59,7 @@ class WorkerClustersTest {
.addrList(normalWorkerServerMetadata.getAddress())
.build();
workerClusters.onWorkerGroupAdd(Lists.newArrayList(workerGroup));
-
assertThat(workerClusters.getWorkerServerAddressByGroup("flinkCluster"))
+
assertThat(workerClusters.getDbWorkerServerAddressByGroup("flinkCluster"))
.containsExactly(normalWorkerServerMetadata.getAddress());
}
@@ -74,7 +74,7 @@ class WorkerClustersTest {
.addrList(normalWorkerServerMetadata.getAddress())
.build();
workerClusters.onWorkerGroupAdd(Lists.newArrayList(workerGroup));
-
assertThat(workerClusters.getWorkerServerAddressByGroup("flinkCluster"))
+
assertThat(workerClusters.getDbWorkerServerAddressByGroup("flinkCluster"))
.containsExactly(normalWorkerServerMetadata.getAddress());
WorkerGroup updatedWorkerGroup = WorkerGroup.builder()
@@ -82,7 +82,7 @@ class WorkerClustersTest {
.addrList("")
.build();
workerClusters.onWorkerGroupChange(Lists.newArrayList(updatedWorkerGroup));
-
assertThat(workerClusters.getWorkerServerAddressByGroup("flinkCluster")).isEmpty();
+
assertThat(workerClusters.getDbWorkerServerAddressByGroup("flinkCluster")).isEmpty();
assertThat(workerClusters.containsWorkerGroup("flinkCluster")).isTrue();
}
@@ -94,7 +94,7 @@ class WorkerClustersTest {
WorkerClusters workerClusters = new WorkerClusters();
workerClusters.onServerAdded(normalWorkerServerMetadata);
workerClusters.onServerAdded(busyWorkerServerMetadata);
- assertThat(workerClusters.getWorkerServerAddressByGroup("default"))
+ assertThat(workerClusters.getDbWorkerServerAddressByGroup("default"))
.containsExactly(normalWorkerServerMetadata.getAddress(),
busyWorkerServerMetadata.getAddress());
assertThat(workerClusters.getNormalWorkerServerAddressByGroup("default"))
.containsExactly(normalWorkerServerMetadata.getAddress());
@@ -110,7 +110,7 @@ class WorkerClustersTest {
workerClusters.onServerAdded(busyWorkerServerMetadata);
workerClusters.onServerRemove(busyWorkerServerMetadata);
- assertThat(workerClusters.getWorkerServerAddressByGroup("default"))
+ assertThat(workerClusters.getDbWorkerServerAddressByGroup("default"))
.containsExactly(normalWorkerServerMetadata.getAddress());
assertThat(workerClusters.getNormalWorkerServerAddressByGroup("default"))
.containsExactly(normalWorkerServerMetadata.getAddress());
@@ -137,7 +137,7 @@ class WorkerClustersTest {
workerClusters.onServerUpdate(workerServerMetadata);
- assertThat(workerClusters.getWorkerServerAddressByGroup("default"))
+ assertThat(workerClusters.getDbWorkerServerAddressByGroup("default"))
.containsExactly(normalWorkerServerMetadata.getAddress(),
workerServerMetadata.getAddress());
assertThat(workerClusters.getNormalWorkerServerAddressByGroup("default"))
.containsExactly(normalWorkerServerMetadata.getAddress(),
workerServerMetadata.getAddress());
diff --git
a/dolphinscheduler-standalone-server/src/main/resources/application.yaml
b/dolphinscheduler-standalone-server/src/main/resources/application.yaml
index 31896727df..4f78b91c04 100644
--- a/dolphinscheduler-standalone-server/src/main/resources/application.yaml
+++ b/dolphinscheduler-standalone-server/src/main/resources/application.yaml
@@ -194,7 +194,7 @@ worker:
max-heartbeat-interval: 10s
# worker host weight to dispatch tasks, default value 100
host-weight: 100
- # worker group name
+ # worker group name. If it is not set, the default value is default.
group: default
server-load-protection:
enabled: true
diff --git
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java
index c92caa79c0..5f3d3cddf6 100644
---
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java
+++
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java
@@ -73,6 +73,11 @@ public class WorkerConfig implements Validator {
workerConfig.setWorkerRegistryPath(
RegistryNodeType.WORKER.getRegistryPath() + "/" +
workerConfig.getWorkerAddress());
+
+ if (StringUtils.isEmpty(group)) {
+ workerConfig.setGroup("default");
+ }
+
printConfig();
}
@@ -87,6 +92,7 @@ public class WorkerConfig implements Validator {
"\n address -> " + workerAddress +
"\n registry-path: " + workerRegistryPath +
"\n physical-task-config -> " + physicalTaskConfig +
+ "\n group -> " + group +
"\n****************************Worker
Configuration**************************************";
log.info(config);
}