This is an automated email from the ASF dual-hosted git repository.

wenjun pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new 25108c84e6 [Improvement-16982][Master] When master startup, initialize 
the cluster from registry (#16983)
25108c84e6 is described below

commit 25108c84e639e086eba9e97c8432974138f78875
Author: Wenjun Ruan <[email protected]>
AuthorDate: Fri Jan 24 14:15:52 2025 +0800

    [Improvement-16982][Master] When master startup, initialize the cluster 
from registry (#16983)
---
 .../server/master/cluster/ClusterManager.java      | 45 ++++++++++++++++-
 .../master/cluster/IMasterSlotChangeListener.java  | 25 ++++++++++
 .../master/cluster/MasterServerMetadata.java       |  3 ++
 .../cluster/MasterSlotChangeListenerAdaptor.java   | 56 ++++++++++++++++++++++
 .../server/master/cluster/MasterSlotManager.java   | 22 +--------
 .../master/cluster/MasterSlotManagerTest.java      | 19 ++++----
 6 files changed, 139 insertions(+), 31 deletions(-)

diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cluster/ClusterManager.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cluster/ClusterManager.java
index 3f875f7047..3c6f9f1ba0 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cluster/ClusterManager.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cluster/ClusterManager.java
@@ -17,6 +17,9 @@
 
 package org.apache.dolphinscheduler.server.master.cluster;
 
+import org.apache.dolphinscheduler.common.model.MasterHeartBeat;
+import org.apache.dolphinscheduler.common.model.WorkerHeartBeat;
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
 import org.apache.dolphinscheduler.registry.api.RegistryClient;
 import org.apache.dolphinscheduler.registry.api.enums.RegistryNodeType;
 
@@ -36,6 +39,9 @@ public class ClusterManager {
     @Getter
     private WorkerClusters workerClusters;
 
+    @Autowired
+    private MasterSlotManager masterSlotManager;
+
     @Autowired
     private WorkerGroupChangeNotifier workerGroupChangeNotifier;
 
@@ -48,11 +54,48 @@ public class ClusterManager {
     }
 
     public void start() {
+        initializeMasterClusters();
+        initializeWorkerClusters();
+        log.info("ClusterManager started...");
+    }
+
+    /**
+     * Initialize the master clusters.
+     * <p> 1. Register master slot listener once master clusters changed.
+     * <p> 2. Fetch master nodes from registry.
+     * <p> 3. Subscribe the master change event.
+     */
+    private void initializeMasterClusters() {
+        this.masterClusters.registerListener(new 
MasterSlotChangeListenerAdaptor(masterSlotManager, masterClusters));
+
+        registryClient.getServerList(RegistryNodeType.MASTER).forEach(server 
-> {
+            final MasterHeartBeat masterHeartBeat =
+                    JSONUtils.parseObject(server.getHeartBeatInfo(), 
MasterHeartBeat.class);
+            
masterClusters.onServerAdded(MasterServerMetadata.parseFromHeartBeat(masterHeartBeat));
+        });
+        log.info("Initialized MasterClusters: {}", 
JSONUtils.toPrettyJsonString(masterClusters.getServers()));
+
         
this.registryClient.subscribe(RegistryNodeType.MASTER.getRegistryPath(), 
masterClusters);
+    }
+
+    /**
+     * Initialize the worker clusters.
+     * <p> 1. Fetch worker nodes from registry.
+     * <p> 2. Register worker group change notifier once worker clusters 
changed.
+     * <p> 3. Subscribe the worker change event.
+     */
+    private void initializeWorkerClusters() {
+        registryClient.getServerList(RegistryNodeType.WORKER).forEach(server 
-> {
+            final WorkerHeartBeat workerHeartBeat =
+                    JSONUtils.parseObject(server.getHeartBeatInfo(), 
WorkerHeartBeat.class);
+            
workerClusters.onServerAdded(WorkerServerMetadata.parseFromHeartBeat(workerHeartBeat));
+        });
+        log.info("Initialized WorkerClusters: {}", 
JSONUtils.toPrettyJsonString(workerClusters.getServers()));
+
         
this.registryClient.subscribe(RegistryNodeType.WORKER.getRegistryPath(), 
workerClusters);
+
         
this.workerGroupChangeNotifier.subscribeWorkerGroupsChange(workerClusters);
         this.workerGroupChangeNotifier.start();
-        log.info("ClusterManager started...");
     }
 
 }
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cluster/IMasterSlotChangeListener.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cluster/IMasterSlotChangeListener.java
new file mode 100644
index 0000000000..8918fa794d
--- /dev/null
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cluster/IMasterSlotChangeListener.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.master.cluster;
+
+import java.util.List;
+
+public interface IMasterSlotChangeListener {
+
+    void onMasterSlotChanged(final List<MasterServerMetadata> 
normalMasterServers);
+}
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cluster/MasterServerMetadata.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cluster/MasterServerMetadata.java
index f68e13d7e9..3ec7c35efd 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cluster/MasterServerMetadata.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cluster/MasterServerMetadata.java
@@ -17,6 +17,8 @@
 
 package org.apache.dolphinscheduler.server.master.cluster;
 
+import static com.google.common.base.Preconditions.checkNotNull;
+
 import org.apache.dolphinscheduler.common.constants.Constants;
 import org.apache.dolphinscheduler.common.model.MasterHeartBeat;
 
@@ -32,6 +34,7 @@ import lombok.experimental.SuperBuilder;
 public class MasterServerMetadata extends BaseServerMetadata implements 
Comparable<MasterServerMetadata> {
 
     public static MasterServerMetadata parseFromHeartBeat(final 
MasterHeartBeat masterHeartBeat) {
+        checkNotNull(masterHeartBeat);
         return MasterServerMetadata.builder()
                 .processId(masterHeartBeat.getProcessId())
                 .serverStartupTime(masterHeartBeat.getStartupTime())
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cluster/MasterSlotChangeListenerAdaptor.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cluster/MasterSlotChangeListenerAdaptor.java
new file mode 100644
index 0000000000..be280f1ed5
--- /dev/null
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cluster/MasterSlotChangeListenerAdaptor.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.master.cluster;
+
+import java.util.List;
+
+public class MasterSlotChangeListenerAdaptor
+        implements
+            IMasterSlotChangeListener,
+            IClusters.IClustersChangeListener<MasterServerMetadata> {
+
+    private final MasterSlotManager masterSlotManager;
+
+    private final MasterClusters masterClusters;
+
+    public MasterSlotChangeListenerAdaptor(final MasterSlotManager 
masterSlotManager,
+                                           final MasterClusters 
masterClusters) {
+        this.masterSlotManager = masterSlotManager;
+        this.masterClusters = masterClusters;
+    }
+
+    @Override
+    public void onMasterSlotChanged(final List<MasterServerMetadata> 
normalMasterServers) {
+        masterSlotManager.doReBalance(normalMasterServers);
+    }
+
+    @Override
+    public void onServerAdded(MasterServerMetadata server) {
+        onMasterSlotChanged(masterClusters.getNormalServers());
+    }
+
+    @Override
+    public void onServerRemove(MasterServerMetadata server) {
+        onMasterSlotChanged(masterClusters.getNormalServers());
+    }
+
+    @Override
+    public void onServerUpdate(MasterServerMetadata server) {
+        onMasterSlotChanged(masterClusters.getNormalServers());
+    }
+}
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cluster/MasterSlotManager.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cluster/MasterSlotManager.java
index 4e619b2fcc..67660bd038 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cluster/MasterSlotManager.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cluster/MasterSlotManager.java
@@ -29,34 +29,14 @@ import org.springframework.stereotype.Component;
 @Component
 public class MasterSlotManager implements IMasterSlotReBalancer {
 
-    private final MasterClusters masterClusters;
-
     private final MasterConfig masterConfig;
 
     private volatile int currentSlot = -1;
 
     private volatile int totalSlots = 0;
 
-    public MasterSlotManager(ClusterManager clusterManager, MasterConfig 
masterConfig) {
+    public MasterSlotManager(final MasterConfig masterConfig) {
         this.masterConfig = masterConfig;
-        this.masterClusters = clusterManager.getMasterClusters();
-        this.masterClusters.registerListener(new 
IClusters.IClustersChangeListener<MasterServerMetadata>() {
-
-            @Override
-            public void onServerAdded(MasterServerMetadata server) {
-                doReBalance(masterClusters.getNormalServers());
-            }
-
-            @Override
-            public void onServerRemove(MasterServerMetadata server) {
-                doReBalance(masterClusters.getNormalServers());
-            }
-
-            @Override
-            public void onServerUpdate(MasterServerMetadata server) {
-                doReBalance(masterClusters.getNormalServers());
-            }
-        });
     }
 
     /**
diff --git 
a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/cluster/MasterSlotManagerTest.java
 
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/cluster/MasterSlotManagerTest.java
index 56d119d776..0b4f659ebc 100644
--- 
a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/cluster/MasterSlotManagerTest.java
+++ 
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/cluster/MasterSlotManagerTest.java
@@ -29,16 +29,16 @@ class MasterSlotManagerTest {
 
     private MasterSlotManager masterSlotManager;
 
-    private ClusterManager clusterManager;
+    private MasterClusters masterClusters;
 
     private MasterConfig masterConfig;
 
     @BeforeEach
     public void setUp() {
-        clusterManager = new ClusterManager();
+        masterClusters = new MasterClusters();
         masterConfig = new MasterConfig();
         masterConfig.setMasterAddress("127.0.0.1:5678");
-        masterSlotManager = new MasterSlotManager(clusterManager, 
masterConfig);
+        masterSlotManager = new MasterSlotManager(masterConfig);
         MasterServerMetadata master1 = MasterServerMetadata.builder()
                 .cpuUsage(0.2)
                 .memoryUsage(0.4)
@@ -63,10 +63,11 @@ class MasterSlotManagerTest {
                 .serverStatus(ServerStatus.BUSY)
                 .address("127.0.0.4:5679")
                 .build();
-        clusterManager.getMasterClusters().onServerAdded(master1);
-        clusterManager.getMasterClusters().onServerAdded(master2);
-        clusterManager.getMasterClusters().onServerAdded(master3);
-        clusterManager.getMasterClusters().onServerAdded(master4);
+        this.masterClusters.registerListener(new 
MasterSlotChangeListenerAdaptor(masterSlotManager, masterClusters));
+        masterClusters.onServerAdded(master1);
+        masterClusters.onServerAdded(master2);
+        masterClusters.onServerAdded(master3);
+        masterClusters.onServerAdded(master4);
     }
 
     @Test
@@ -98,8 +99,8 @@ class MasterSlotManagerTest {
                 .serverStatus(ServerStatus.BUSY)
                 .address("127.0.0.4:5679")
                 .build();
-        clusterManager.getMasterClusters().onServerRemove(master2);
-        clusterManager.getMasterClusters().onServerRemove(master3);
+        masterClusters.onServerRemove(master2);
+        masterClusters.onServerRemove(master3);
         // After doReBalance, the total master slots should be 2
         assertThat(masterSlotManager.getTotalMasterSlots()).isEqualTo(2);
     }

Reply via email to