This is an automated email from the ASF dual-hosted git repository.
wenjun pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new 25108c84e6 [Improvement-16982][Master] When master startup, initialize
the cluster from registry (#16983)
25108c84e6 is described below
commit 25108c84e639e086eba9e97c8432974138f78875
Author: Wenjun Ruan <[email protected]>
AuthorDate: Fri Jan 24 14:15:52 2025 +0800
[Improvement-16982][Master] When master startup, initialize the cluster
from registry (#16983)
---
.../server/master/cluster/ClusterManager.java | 45 ++++++++++++++++-
.../master/cluster/IMasterSlotChangeListener.java | 25 ++++++++++
.../master/cluster/MasterServerMetadata.java | 3 ++
.../cluster/MasterSlotChangeListenerAdaptor.java | 56 ++++++++++++++++++++++
.../server/master/cluster/MasterSlotManager.java | 22 +--------
.../master/cluster/MasterSlotManagerTest.java | 19 ++++----
6 files changed, 139 insertions(+), 31 deletions(-)
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cluster/ClusterManager.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cluster/ClusterManager.java
index 3f875f7047..3c6f9f1ba0 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cluster/ClusterManager.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cluster/ClusterManager.java
@@ -17,6 +17,9 @@
package org.apache.dolphinscheduler.server.master.cluster;
+import org.apache.dolphinscheduler.common.model.MasterHeartBeat;
+import org.apache.dolphinscheduler.common.model.WorkerHeartBeat;
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.registry.api.RegistryClient;
import org.apache.dolphinscheduler.registry.api.enums.RegistryNodeType;
@@ -36,6 +39,9 @@ public class ClusterManager {
@Getter
private WorkerClusters workerClusters;
+ @Autowired
+ private MasterSlotManager masterSlotManager;
+
@Autowired
private WorkerGroupChangeNotifier workerGroupChangeNotifier;
@@ -48,11 +54,48 @@ public class ClusterManager {
}
public void start() {
+ initializeMasterClusters();
+ initializeWorkerClusters();
+ log.info("ClusterManager started...");
+ }
+
+ /**
+ * Initialize the master clusters.
+ * <p> 1. Register master slot listener once master clusters changed.
+ * <p> 2. Fetch master nodes from registry.
+ * <p> 3. Subscribe the master change event.
+ */
+ private void initializeMasterClusters() {
+ this.masterClusters.registerListener(new
MasterSlotChangeListenerAdaptor(masterSlotManager, masterClusters));
+
+ registryClient.getServerList(RegistryNodeType.MASTER).forEach(server
-> {
+ final MasterHeartBeat masterHeartBeat =
+ JSONUtils.parseObject(server.getHeartBeatInfo(),
MasterHeartBeat.class);
+
masterClusters.onServerAdded(MasterServerMetadata.parseFromHeartBeat(masterHeartBeat));
+ });
+ log.info("Initialized MasterClusters: {}",
JSONUtils.toPrettyJsonString(masterClusters.getServers()));
+
this.registryClient.subscribe(RegistryNodeType.MASTER.getRegistryPath(),
masterClusters);
+ }
+
+ /**
+ * Initialize the worker clusters.
+ * <p> 1. Fetch worker nodes from registry.
+ * <p> 2. Register worker group change notifier once worker clusters
changed.
+ * <p> 3. Subscribe the worker change event.
+ */
+ private void initializeWorkerClusters() {
+ registryClient.getServerList(RegistryNodeType.WORKER).forEach(server
-> {
+ final WorkerHeartBeat workerHeartBeat =
+ JSONUtils.parseObject(server.getHeartBeatInfo(),
WorkerHeartBeat.class);
+
workerClusters.onServerAdded(WorkerServerMetadata.parseFromHeartBeat(workerHeartBeat));
+ });
+ log.info("Initialized WorkerClusters: {}",
JSONUtils.toPrettyJsonString(workerClusters.getServers()));
+
this.registryClient.subscribe(RegistryNodeType.WORKER.getRegistryPath(),
workerClusters);
+
this.workerGroupChangeNotifier.subscribeWorkerGroupsChange(workerClusters);
this.workerGroupChangeNotifier.start();
- log.info("ClusterManager started...");
}
}
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cluster/IMasterSlotChangeListener.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cluster/IMasterSlotChangeListener.java
new file mode 100644
index 0000000000..8918fa794d
--- /dev/null
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cluster/IMasterSlotChangeListener.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.master.cluster;
+
+import java.util.List;
+
+public interface IMasterSlotChangeListener {
+
+ void onMasterSlotChanged(final List<MasterServerMetadata>
normalMasterServers);
+}
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cluster/MasterServerMetadata.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cluster/MasterServerMetadata.java
index f68e13d7e9..3ec7c35efd 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cluster/MasterServerMetadata.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cluster/MasterServerMetadata.java
@@ -17,6 +17,8 @@
package org.apache.dolphinscheduler.server.master.cluster;
+import static com.google.common.base.Preconditions.checkNotNull;
+
import org.apache.dolphinscheduler.common.constants.Constants;
import org.apache.dolphinscheduler.common.model.MasterHeartBeat;
@@ -32,6 +34,7 @@ import lombok.experimental.SuperBuilder;
public class MasterServerMetadata extends BaseServerMetadata implements
Comparable<MasterServerMetadata> {
public static MasterServerMetadata parseFromHeartBeat(final
MasterHeartBeat masterHeartBeat) {
+ checkNotNull(masterHeartBeat);
return MasterServerMetadata.builder()
.processId(masterHeartBeat.getProcessId())
.serverStartupTime(masterHeartBeat.getStartupTime())
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cluster/MasterSlotChangeListenerAdaptor.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cluster/MasterSlotChangeListenerAdaptor.java
new file mode 100644
index 0000000000..be280f1ed5
--- /dev/null
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cluster/MasterSlotChangeListenerAdaptor.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.master.cluster;
+
+import java.util.List;
+
+public class MasterSlotChangeListenerAdaptor
+ implements
+ IMasterSlotChangeListener,
+ IClusters.IClustersChangeListener<MasterServerMetadata> {
+
+ private final MasterSlotManager masterSlotManager;
+
+ private final MasterClusters masterClusters;
+
+ public MasterSlotChangeListenerAdaptor(final MasterSlotManager
masterSlotManager,
+ final MasterClusters
masterClusters) {
+ this.masterSlotManager = masterSlotManager;
+ this.masterClusters = masterClusters;
+ }
+
+ @Override
+ public void onMasterSlotChanged(final List<MasterServerMetadata>
normalMasterServers) {
+ masterSlotManager.doReBalance(normalMasterServers);
+ }
+
+ @Override
+ public void onServerAdded(MasterServerMetadata server) {
+ onMasterSlotChanged(masterClusters.getNormalServers());
+ }
+
+ @Override
+ public void onServerRemove(MasterServerMetadata server) {
+ onMasterSlotChanged(masterClusters.getNormalServers());
+ }
+
+ @Override
+ public void onServerUpdate(MasterServerMetadata server) {
+ onMasterSlotChanged(masterClusters.getNormalServers());
+ }
+}
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cluster/MasterSlotManager.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cluster/MasterSlotManager.java
index 4e619b2fcc..67660bd038 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cluster/MasterSlotManager.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cluster/MasterSlotManager.java
@@ -29,34 +29,14 @@ import org.springframework.stereotype.Component;
@Component
public class MasterSlotManager implements IMasterSlotReBalancer {
- private final MasterClusters masterClusters;
-
private final MasterConfig masterConfig;
private volatile int currentSlot = -1;
private volatile int totalSlots = 0;
- public MasterSlotManager(ClusterManager clusterManager, MasterConfig
masterConfig) {
+ public MasterSlotManager(final MasterConfig masterConfig) {
this.masterConfig = masterConfig;
- this.masterClusters = clusterManager.getMasterClusters();
- this.masterClusters.registerListener(new
IClusters.IClustersChangeListener<MasterServerMetadata>() {
-
- @Override
- public void onServerAdded(MasterServerMetadata server) {
- doReBalance(masterClusters.getNormalServers());
- }
-
- @Override
- public void onServerRemove(MasterServerMetadata server) {
- doReBalance(masterClusters.getNormalServers());
- }
-
- @Override
- public void onServerUpdate(MasterServerMetadata server) {
- doReBalance(masterClusters.getNormalServers());
- }
- });
}
/**
diff --git
a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/cluster/MasterSlotManagerTest.java
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/cluster/MasterSlotManagerTest.java
index 56d119d776..0b4f659ebc 100644
---
a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/cluster/MasterSlotManagerTest.java
+++
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/cluster/MasterSlotManagerTest.java
@@ -29,16 +29,16 @@ class MasterSlotManagerTest {
private MasterSlotManager masterSlotManager;
- private ClusterManager clusterManager;
+ private MasterClusters masterClusters;
private MasterConfig masterConfig;
@BeforeEach
public void setUp() {
- clusterManager = new ClusterManager();
+ masterClusters = new MasterClusters();
masterConfig = new MasterConfig();
masterConfig.setMasterAddress("127.0.0.1:5678");
- masterSlotManager = new MasterSlotManager(clusterManager,
masterConfig);
+ masterSlotManager = new MasterSlotManager(masterConfig);
MasterServerMetadata master1 = MasterServerMetadata.builder()
.cpuUsage(0.2)
.memoryUsage(0.4)
@@ -63,10 +63,11 @@ class MasterSlotManagerTest {
.serverStatus(ServerStatus.BUSY)
.address("127.0.0.4:5679")
.build();
- clusterManager.getMasterClusters().onServerAdded(master1);
- clusterManager.getMasterClusters().onServerAdded(master2);
- clusterManager.getMasterClusters().onServerAdded(master3);
- clusterManager.getMasterClusters().onServerAdded(master4);
+ this.masterClusters.registerListener(new
MasterSlotChangeListenerAdaptor(masterSlotManager, masterClusters));
+ masterClusters.onServerAdded(master1);
+ masterClusters.onServerAdded(master2);
+ masterClusters.onServerAdded(master3);
+ masterClusters.onServerAdded(master4);
}
@Test
@@ -98,8 +99,8 @@ class MasterSlotManagerTest {
.serverStatus(ServerStatus.BUSY)
.address("127.0.0.4:5679")
.build();
- clusterManager.getMasterClusters().onServerRemove(master2);
- clusterManager.getMasterClusters().onServerRemove(master3);
+ masterClusters.onServerRemove(master2);
+ masterClusters.onServerRemove(master3);
// After doReBalance, the total master slots should be 2
assertThat(masterSlotManager.getTotalMasterSlots()).isEqualTo(2);
}