This is an automated email from the ASF dual-hosted git repository.

gallardot pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new 566651c6c5 [Improvement-16880] Merge worker group from config and ui 
and distinct display it in api  (#16883)
566651c6c5 is described below

commit 566651c6c58672deaf438d29fcf2f644c67a9160
Author: xiangzihao <[email protected]>
AuthorDate: Mon Dec 9 13:27:24 2024 +0800

    [Improvement-16880] Merge worker group from config and ui and distinct 
display it in api  (#16883)
---
 .../ProjectWorkerGroupRelationServiceImpl.java     |  2 +-
 .../api/service/impl/WorkerGroupServiceImpl.java   |  3 +-
 .../server/master/cluster/WorkerClusters.java      | 56 +++++++++++++++-------
 .../server/master/cluster/WorkerClustersTest.java  | 16 +++----
 .../src/main/resources/application.yaml            |  2 +-
 .../server/worker/config/WorkerConfig.java         |  6 +++
 6 files changed, 56 insertions(+), 29 deletions(-)

diff --git 
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProjectWorkerGroupRelationServiceImpl.java
 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProjectWorkerGroupRelationServiceImpl.java
index 3e6e4617f5..94c6eec53b 100644
--- 
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProjectWorkerGroupRelationServiceImpl.java
+++ 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProjectWorkerGroupRelationServiceImpl.java
@@ -187,7 +187,7 @@ public class ProjectWorkerGroupRelationServiceImpl extends 
BaseServiceImpl
             projectWorkerGroup.setProjectCode(projectCode);
             projectWorkerGroup.setWorkerGroup(workerGroup);
             return projectWorkerGroup;
-        }).collect(Collectors.toList());
+        }).distinct().collect(Collectors.toList());
 
         result.put(Constants.DATA_LIST, projectWorkerGroups);
         putMsg(result, Status.SUCCESS);
diff --git 
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkerGroupServiceImpl.java
 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkerGroupServiceImpl.java
index 52db3bb7b0..eefe307670 100644
--- 
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkerGroupServiceImpl.java
+++ 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkerGroupServiceImpl.java
@@ -303,7 +303,7 @@ public class WorkerGroupServiceImpl extends BaseServiceImpl 
implements WorkerGro
                 .map(WorkerGroup::getName)
                 .collect(Collectors.toList());
         availableWorkerGroupList.addAll(configWorkerGroupNames);
-        result.put(Constants.DATA_LIST, availableWorkerGroupList);
+        result.put(Constants.DATA_LIST, 
availableWorkerGroupList.stream().distinct().collect(Collectors.toList()));
         putMsg(result, Status.SUCCESS);
         return result;
     }
@@ -362,6 +362,7 @@ public class WorkerGroupServiceImpl extends BaseServiceImpl 
implements WorkerGro
         }
 
         workerGroupDao.deleteById(id);
+        boardCastToMasterThatWorkerGroupChanged();
 
         log.info("Delete worker group complete, workerGroupName:{}.", 
workerGroup.getName());
         putMsg(result, Status.SUCCESS);
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cluster/WorkerClusters.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cluster/WorkerClusters.java
index 509c54f3fb..1716e78b07 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cluster/WorkerClusters.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cluster/WorkerClusters.java
@@ -43,8 +43,11 @@ public class WorkerClusters extends 
AbstractClusterSubscribeListener<WorkerServe
     // WorkerIdentifier(workerAddress) -> worker
     private final Map<String, WorkerServerMetadata> workerMapping = new 
ConcurrentHashMap<>();
 
-    // WorkerGroup -> WorkerIdentifier(workerAddress)
-    private final Map<String, List<String>> workerGroupMapping = new 
ConcurrentHashMap<>();
+    // WorkerGroup from db -> WorkerIdentifier(workerAddress)
+    private final Map<String, List<String>> dbWorkerGroupMapping = new 
ConcurrentHashMap<>();
+
+    // WorkerGroup from config -> WorkerIdentifier(workerAddress)
+    private final Map<String, List<String>> configWorkerGroupMapping = new 
ConcurrentHashMap<>();
 
     private final List<IClustersChangeListener<WorkerServerMetadata>> 
workerClusterChangeListeners =
             new CopyOnWriteArrayList<>();
@@ -59,27 +62,44 @@ public class WorkerClusters extends 
AbstractClusterSubscribeListener<WorkerServe
         return Optional.ofNullable(workerMapping.get(address));
     }
 
-    public List<String> getWorkerServerAddressByGroup(String workerGroup) {
+    public List<String> getDbWorkerServerAddressByGroup(String workerGroup) {
+        if (WorkerGroupUtils.getDefaultWorkerGroup().equals(workerGroup)) {
+            return UnmodifiableList.unmodifiableList(new 
ArrayList<>(workerMapping.keySet()));
+        }
+        return dbWorkerGroupMapping.getOrDefault(workerGroup, 
Collections.emptyList());
+    }
+
+    public List<String> getConfigWorkerServerAddressByGroup(String 
workerGroup) {
         if (WorkerGroupUtils.getDefaultWorkerGroup().equals(workerGroup)) {
             return UnmodifiableList.unmodifiableList(new 
ArrayList<>(workerMapping.keySet()));
         }
-        return workerGroupMapping.getOrDefault(workerGroup, 
Collections.emptyList());
+        return configWorkerGroupMapping.getOrDefault(workerGroup, 
Collections.emptyList());
     }
 
     public List<String> getNormalWorkerServerAddressByGroup(String 
workerGroup) {
-        List<String> normalWorkerAddresses = 
getWorkerServerAddressByGroup(workerGroup)
+        List<String> dbWorkerAddresses = 
getDbWorkerServerAddressByGroup(workerGroup)
+                .stream()
+                .map(workerMapping::get)
+                .filter(Objects::nonNull)
+                .filter(workerServer -> workerServer.getServerStatus() == 
ServerStatus.NORMAL)
+                .map(WorkerServerMetadata::getAddress)
+                .collect(Collectors.toList());
+        List<String> configWorkerAddresses = 
getConfigWorkerServerAddressByGroup(workerGroup)
                 .stream()
                 .map(workerMapping::get)
                 .filter(Objects::nonNull)
                 .filter(workerServer -> workerServer.getServerStatus() == 
ServerStatus.NORMAL)
                 .map(WorkerServerMetadata::getAddress)
                 .collect(Collectors.toList());
-        return UnmodifiableList.unmodifiableList(normalWorkerAddresses);
+        dbWorkerAddresses.removeAll(configWorkerAddresses);
+        dbWorkerAddresses.addAll(configWorkerAddresses);
+        return UnmodifiableList.unmodifiableList(dbWorkerAddresses);
     }
 
     public boolean containsWorkerGroup(String workerGroup) {
         return WorkerGroupUtils.getDefaultWorkerGroup().equals(workerGroup)
-                || workerGroupMapping.containsKey(workerGroup);
+                || dbWorkerGroupMapping.containsKey(workerGroup)
+                || configWorkerGroupMapping.containsKey(workerGroup);
     }
 
     @Override
@@ -89,9 +109,9 @@ public class WorkerClusters extends 
AbstractClusterSubscribeListener<WorkerServe
 
     @Override
     public void onWorkerGroupDelete(List<WorkerGroup> workerGroups) {
-        synchronized (workerGroupMapping) {
+        synchronized (dbWorkerGroupMapping) {
             for (WorkerGroup workerGroup : workerGroups) {
-                workerGroupMapping.remove(workerGroup.getName());
+                dbWorkerGroupMapping.remove(workerGroup.getName());
             }
         }
     }
@@ -112,8 +132,8 @@ public class WorkerClusters extends 
AbstractClusterSubscribeListener<WorkerServe
                     .filter(Objects::nonNull)
                     .map(WorkerServerMetadata::getAddress)
                     .collect(Collectors.toList());
-            synchronized (workerGroupMapping) {
-                workerGroupMapping.put(workerGroup.getName(), activeWorkers);
+            synchronized (dbWorkerGroupMapping) {
+                dbWorkerGroupMapping.put(workerGroup.getName(), activeWorkers);
             }
         }
     }
@@ -130,15 +150,15 @@ public class WorkerClusters extends 
AbstractClusterSubscribeListener<WorkerServe
     @Override
     public void onServerAdded(WorkerServerMetadata workerServer) {
         workerMapping.put(workerServer.getAddress(), workerServer);
-        synchronized (workerGroupMapping) {
-            List<String> addWorkerGroupAddrList = 
workerGroupMapping.get(workerServer.getWorkerGroup());
+        synchronized (configWorkerGroupMapping) {
+            List<String> addWorkerGroupAddrList = 
configWorkerGroupMapping.get(workerServer.getWorkerGroup());
             if (addWorkerGroupAddrList == null) {
                 List<String> newWorkerGroupAddrList = new ArrayList<>();
                 newWorkerGroupAddrList.add(workerServer.getAddress());
-                workerGroupMapping.put(workerServer.getWorkerGroup(), 
newWorkerGroupAddrList);
+                configWorkerGroupMapping.put(workerServer.getWorkerGroup(), 
newWorkerGroupAddrList);
             } else if 
(!addWorkerGroupAddrList.contains(workerServer.getAddress())) {
                 addWorkerGroupAddrList.add(workerServer.getAddress());
-                workerGroupMapping.put(workerServer.getWorkerGroup(), 
addWorkerGroupAddrList);
+                configWorkerGroupMapping.put(workerServer.getWorkerGroup(), 
addWorkerGroupAddrList);
             }
         }
         for (IClustersChangeListener<WorkerServerMetadata> listener : 
workerClusterChangeListeners) {
@@ -149,12 +169,12 @@ public class WorkerClusters extends 
AbstractClusterSubscribeListener<WorkerServe
     @Override
     public void onServerRemove(WorkerServerMetadata workerServer) {
         workerMapping.remove(workerServer.getAddress(), workerServer);
-        synchronized (workerGroupMapping) {
-            List<String> removeWorkerGroupAddrList = 
workerGroupMapping.get(workerServer.getWorkerGroup());
+        synchronized (configWorkerGroupMapping) {
+            List<String> removeWorkerGroupAddrList = 
configWorkerGroupMapping.get(workerServer.getWorkerGroup());
             if (removeWorkerGroupAddrList != null && 
removeWorkerGroupAddrList.contains(workerServer.getAddress())) {
                 removeWorkerGroupAddrList.remove(workerServer.getAddress());
                 if (removeWorkerGroupAddrList.isEmpty()) {
-                    workerGroupMapping.remove(workerServer.getWorkerGroup());
+                    
configWorkerGroupMapping.remove(workerServer.getWorkerGroup());
                 }
             }
         }
diff --git 
a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/cluster/WorkerClustersTest.java
 
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/cluster/WorkerClustersTest.java
index 045e493d57..0be794c497 100644
--- 
a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/cluster/WorkerClustersTest.java
+++ 
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/cluster/WorkerClustersTest.java
@@ -40,12 +40,12 @@ class WorkerClustersTest {
                 .addrList(normalWorkerServerMetadata.getAddress())
                 .build();
         workerClusters.onWorkerGroupAdd(Lists.newArrayList(workerGroup));
-        
assertThat(workerClusters.getWorkerServerAddressByGroup("flinkCluster"))
+        
assertThat(workerClusters.getDbWorkerServerAddressByGroup("flinkCluster"))
                 .containsExactly(normalWorkerServerMetadata.getAddress());
 
         workerClusters.onWorkerGroupDelete(Lists.newArrayList(workerGroup));
         
Truth.assertThat(workerClusters.containsWorkerGroup("flinkCluster")).isFalse();
-        
assertThat(workerClusters.getWorkerServerAddressByGroup("flinkCluster")).isEmpty();
+        
assertThat(workerClusters.getDbWorkerServerAddressByGroup("flinkCluster")).isEmpty();
     }
 
     @Test
@@ -59,7 +59,7 @@ class WorkerClustersTest {
                 .addrList(normalWorkerServerMetadata.getAddress())
                 .build();
         workerClusters.onWorkerGroupAdd(Lists.newArrayList(workerGroup));
-        
assertThat(workerClusters.getWorkerServerAddressByGroup("flinkCluster"))
+        
assertThat(workerClusters.getDbWorkerServerAddressByGroup("flinkCluster"))
                 .containsExactly(normalWorkerServerMetadata.getAddress());
     }
 
@@ -74,7 +74,7 @@ class WorkerClustersTest {
                 .addrList(normalWorkerServerMetadata.getAddress())
                 .build();
         workerClusters.onWorkerGroupAdd(Lists.newArrayList(workerGroup));
-        
assertThat(workerClusters.getWorkerServerAddressByGroup("flinkCluster"))
+        
assertThat(workerClusters.getDbWorkerServerAddressByGroup("flinkCluster"))
                 .containsExactly(normalWorkerServerMetadata.getAddress());
 
         WorkerGroup updatedWorkerGroup = WorkerGroup.builder()
@@ -82,7 +82,7 @@ class WorkerClustersTest {
                 .addrList("")
                 .build();
         
workerClusters.onWorkerGroupChange(Lists.newArrayList(updatedWorkerGroup));
-        
assertThat(workerClusters.getWorkerServerAddressByGroup("flinkCluster")).isEmpty();
+        
assertThat(workerClusters.getDbWorkerServerAddressByGroup("flinkCluster")).isEmpty();
         
assertThat(workerClusters.containsWorkerGroup("flinkCluster")).isTrue();
     }
 
@@ -94,7 +94,7 @@ class WorkerClustersTest {
         WorkerClusters workerClusters = new WorkerClusters();
         workerClusters.onServerAdded(normalWorkerServerMetadata);
         workerClusters.onServerAdded(busyWorkerServerMetadata);
-        assertThat(workerClusters.getWorkerServerAddressByGroup("default"))
+        assertThat(workerClusters.getDbWorkerServerAddressByGroup("default"))
                 .containsExactly(normalWorkerServerMetadata.getAddress(), 
busyWorkerServerMetadata.getAddress());
         
assertThat(workerClusters.getNormalWorkerServerAddressByGroup("default"))
                 .containsExactly(normalWorkerServerMetadata.getAddress());
@@ -110,7 +110,7 @@ class WorkerClustersTest {
         workerClusters.onServerAdded(busyWorkerServerMetadata);
         workerClusters.onServerRemove(busyWorkerServerMetadata);
 
-        assertThat(workerClusters.getWorkerServerAddressByGroup("default"))
+        assertThat(workerClusters.getDbWorkerServerAddressByGroup("default"))
                 .containsExactly(normalWorkerServerMetadata.getAddress());
         
assertThat(workerClusters.getNormalWorkerServerAddressByGroup("default"))
                 .containsExactly(normalWorkerServerMetadata.getAddress());
@@ -137,7 +137,7 @@ class WorkerClustersTest {
 
         workerClusters.onServerUpdate(workerServerMetadata);
 
-        assertThat(workerClusters.getWorkerServerAddressByGroup("default"))
+        assertThat(workerClusters.getDbWorkerServerAddressByGroup("default"))
                 .containsExactly(normalWorkerServerMetadata.getAddress(), 
workerServerMetadata.getAddress());
         
assertThat(workerClusters.getNormalWorkerServerAddressByGroup("default"))
                 .containsExactly(normalWorkerServerMetadata.getAddress(), 
workerServerMetadata.getAddress());
diff --git 
a/dolphinscheduler-standalone-server/src/main/resources/application.yaml 
b/dolphinscheduler-standalone-server/src/main/resources/application.yaml
index 31896727df..4f78b91c04 100644
--- a/dolphinscheduler-standalone-server/src/main/resources/application.yaml
+++ b/dolphinscheduler-standalone-server/src/main/resources/application.yaml
@@ -194,7 +194,7 @@ worker:
   max-heartbeat-interval: 10s
   # worker host weight to dispatch tasks, default value 100
   host-weight: 100
-  # worker group name
+  # worker group name. If it is not set, the default value is default.
   group: default
   server-load-protection:
     enabled: true
diff --git 
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java
 
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java
index c92caa79c0..5f3d3cddf6 100644
--- 
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java
+++ 
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java
@@ -73,6 +73,11 @@ public class WorkerConfig implements Validator {
 
         workerConfig.setWorkerRegistryPath(
                 RegistryNodeType.WORKER.getRegistryPath() + "/" + 
workerConfig.getWorkerAddress());
+
+        if (StringUtils.isEmpty(group)) {
+            workerConfig.setGroup("default");
+        }
+
         printConfig();
     }
 
@@ -87,6 +92,7 @@ public class WorkerConfig implements Validator {
                         "\n  address -> " + workerAddress +
                         "\n  registry-path: " + workerRegistryPath +
                         "\n  physical-task-config -> " + physicalTaskConfig +
+                        "\n  group -> " + group +
                         "\n****************************Worker 
Configuration**************************************";
         log.info(config);
     }

Reply via email to