This is an automated email from the ASF dual-hosted git repository.

kirs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new 72daba5  [DS-6694][Master] Reduce the refresh resource interval of 
LowerWeight (#6695)
72daba5 is described below

commit 72daba58cd378a347e9657b994bb631c0704fc2a
Author: wind <[email protected]>
AuthorDate: Fri Nov 5 19:10:36 2021 +0800

    [DS-6694][Master] Reduce the refresh resource interval of LowerWeight 
(#6695)
    
    * [DS-6694][Master] Reduce the refresh resource interval of LowerWeight
    
    * add server node update event handle
    
    Co-authored-by: caishunfeng <[email protected]>
---
 .../dolphinscheduler/common/utils/HeartBeat.java   |  1 +
 .../registry/zookeeper/ZookeeperRegistry.java      |  4 +--
 .../registry/zookeeper/ZookeeperRegistryTest.java  |  2 +-
 .../dispatch/host/LowerWeightHostManager.java      |  2 +-
 .../registry/MasterRegistryDataListener.java       |  3 +-
 .../server/master/registry/ServerNodeManager.java  | 35 +++++++++++++++++++---
 .../spi/register/ListenerManager.java              |  4 +--
 .../spi/register/SubscribeListener.java            |  2 +-
 8 files changed, 39 insertions(+), 14 deletions(-)

diff --git 
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/HeartBeat.java
 
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/HeartBeat.java
index bec0f75..d28cd3d 100644
--- 
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/HeartBeat.java
+++ 
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/HeartBeat.java
@@ -184,6 +184,7 @@ public class HeartBeat {
      * update server state
      */
     public void updateServerState() {
+        this.reportTime = System.currentTimeMillis();
         if (loadAverage > maxCpuloadAvg || availablePhysicalMemorySize < 
reservedMemory) {
             logger.warn("current cpu load average {} is too high or available 
memory {}G is too low, under max.cpuload.avg={} and reserved.memory={}G",
                     loadAverage, availablePhysicalMemorySize, maxCpuloadAvg, 
reservedMemory);
diff --git 
a/dolphinscheduler-registry-plugin/dolphinscheduler-registry-zookeeper/src/main/java/org/apache/dolphinscheduler/plugin/registry/zookeeper/ZookeeperRegistry.java
 
b/dolphinscheduler-registry-plugin/dolphinscheduler-registry-zookeeper/src/main/java/org/apache/dolphinscheduler/plugin/registry/zookeeper/ZookeeperRegistry.java
index 64b0b13..e84666a 100644
--- 
a/dolphinscheduler-registry-plugin/dolphinscheduler-registry-zookeeper/src/main/java/org/apache/dolphinscheduler/plugin/registry/zookeeper/ZookeeperRegistry.java
+++ 
b/dolphinscheduler-registry-plugin/dolphinscheduler-registry-zookeeper/src/main/java/org/apache/dolphinscheduler/plugin/registry/zookeeper/ZookeeperRegistry.java
@@ -148,14 +148,12 @@ public class ZookeeperRegistry implements Registry {
             String dataPath = null;
             switch (type) {
                 case NODE_ADDED:
-
                     dataPath = event.getData().getPath();
                     eventType = DataChangeEvent.ADD;
                     break;
                 case NODE_UPDATED:
                     eventType = DataChangeEvent.UPDATE;
                     dataPath = event.getData().getPath();
-
                     break;
                 case NODE_REMOVED:
                     eventType = DataChangeEvent.REMOVE;
@@ -164,7 +162,7 @@ public class ZookeeperRegistry implements Registry {
                 default:
             }
             if (null != eventType && null != dataPath) {
-                ListenerManager.dataChange(path, dataPath, eventType);
+                ListenerManager.dataChange(path, dataPath, new 
String(event.getData().getData()), eventType);
             }
         };
         treeCache.getListenable().addListener(treeCacheListener);
diff --git 
a/dolphinscheduler-registry-plugin/dolphinscheduler-registry-zookeeper/src/test/java/org/apache/dolphinscheduler/plugin/registry/zookeeper/ZookeeperRegistryTest.java
 
b/dolphinscheduler-registry-plugin/dolphinscheduler-registry-zookeeper/src/test/java/org/apache/dolphinscheduler/plugin/registry/zookeeper/ZookeeperRegistryTest.java
index 900c7e4..a5dc33b 100644
--- 
a/dolphinscheduler-registry-plugin/dolphinscheduler-registry-zookeeper/src/test/java/org/apache/dolphinscheduler/plugin/registry/zookeeper/ZookeeperRegistryTest.java
+++ 
b/dolphinscheduler-registry-plugin/dolphinscheduler-registry-zookeeper/src/test/java/org/apache/dolphinscheduler/plugin/registry/zookeeper/ZookeeperRegistryTest.java
@@ -115,7 +115,7 @@ public class ZookeeperRegistryTest {
     class TestListener implements SubscribeListener {
 
         @Override
-        public void notify(String path, DataChangeEvent dataChangeEvent) {
+        public void notify(String path, String data, DataChangeEvent 
dataChangeEvent) {
             logger.info("I'm test listener");
         }
     }
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/LowerWeightHostManager.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/LowerWeightHostManager.java
index f78b957..64a15b5 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/LowerWeightHostManager.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/LowerWeightHostManager.java
@@ -78,7 +78,7 @@ public class LowerWeightHostManager extends CommonHostManager 
{
         this.workerHostWeightsMap = new ConcurrentHashMap<>();
         this.lock = new ReentrantLock();
         this.executorService = Executors.newSingleThreadScheduledExecutor(new 
NamedThreadFactory("LowerWeightHostManagerExecutor"));
-        this.executorService.scheduleWithFixedDelay(new RefreshResourceTask(), 
0, 5, TimeUnit.SECONDS);
+        this.executorService.scheduleWithFixedDelay(new RefreshResourceTask(), 
0, 1, TimeUnit.SECONDS);
     }
 
     @PreDestroy
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryDataListener.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryDataListener.java
index c712ac0..4fd50a0 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryDataListener.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryDataListener.java
@@ -39,9 +39,8 @@ public class MasterRegistryDataListener implements 
SubscribeListener {
         masterRegistryClient = 
SpringApplicationContext.getBean(MasterRegistryClient.class);
     }
 
-
     @Override
-    public void notify(String path, DataChangeEvent event) {
+    public void notify(String path, String data, DataChangeEvent event) {
         //monitor master
         if (path.startsWith(REGISTRY_DOLPHINSCHEDULER_MASTERS + 
Constants.SINGLE_SLASH)) {
             handleMasterEvent(event, path);
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/ServerNodeManager.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/ServerNodeManager.java
index 09f4cc2..1bceeb7 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/ServerNodeManager.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/ServerNodeManager.java
@@ -190,7 +190,7 @@ public class ServerNodeManager implements InitializingBean {
         public void run() {
             // sync worker node info
             Map<String, String> newWorkerNodeInfo = 
registryClient.getServerMaps(NodeType.WORKER, true);
-            syncWorkerNodeInfo(newWorkerNodeInfo);
+            syncAllWorkerNodeInfo(newWorkerNodeInfo);
 
             // sync worker group nodes from database
             List<WorkerGroup> workerGroupList = 
workerGroupMapper.queryAllWorkerGroup();
@@ -218,7 +218,7 @@ public class ServerNodeManager implements InitializingBean {
     class WorkerDataListener implements SubscribeListener {
 
         @Override
-        public void notify(String path, DataChangeEvent dataChangeEvent) {
+        public void notify(String path, String data, DataChangeEvent 
dataChangeEvent) {
             if (registryClient.isWorkerPath(path)) {
                 try {
                     if (dataChangeEvent == DataChangeEvent.ADD) {
@@ -233,6 +233,14 @@ public class ServerNodeManager implements InitializingBean 
{
                         Set<String> currentNodes = 
registryClient.getWorkerGroupNodesDirectly(group);
                         syncWorkerGroupNodes(group, currentNodes);
                         alertDao.sendServerStopedAlert(1, path, "WORKER");
+                    } else if (dataChangeEvent == DataChangeEvent.UPDATE) {
+                        logger.debug("worker group node : {} update, data: 
{}", path, data);
+                        String group = parseGroup(path);
+                        Set<String> currentNodes = 
registryClient.getWorkerGroupNodesDirectly(group);
+                        syncWorkerGroupNodes(group, currentNodes);
+
+                        String node = parseNode(path);
+                        syncSingleWorkerNodeInfo(node, data);
                     }
                 } catch (IllegalArgumentException ex) {
                     logger.warn(ex.getMessage());
@@ -251,6 +259,13 @@ public class ServerNodeManager implements InitializingBean 
{
             return parts[parts.length - 2];
         }
 
+        private String parseNode(String path) {
+            String[] parts = path.split("/");
+            if (parts.length < WORKER_LISTENER_CHECK_LENGTH) {
+                throw new IllegalArgumentException(String.format("worker group 
path : %s is not valid, ignore", path));
+            }
+            return parts[parts.length - 1];
+        }
     }
 
     /**
@@ -258,7 +273,7 @@ public class ServerNodeManager implements InitializingBean {
      */
     class MasterDataListener implements SubscribeListener {
         @Override
-        public void notify(String path, DataChangeEvent dataChangeEvent) {
+        public void notify(String path, String data, DataChangeEvent 
dataChangeEvent) {
             if (registryClient.isMasterPath(path)) {
                 try {
                     if (dataChangeEvent.equals(DataChangeEvent.ADD)) {
@@ -407,7 +422,7 @@ public class ServerNodeManager implements InitializingBean {
      *
      * @param newWorkerNodeInfo new worker node info
      */
-    private void syncWorkerNodeInfo(Map<String, String> newWorkerNodeInfo) {
+    private void syncAllWorkerNodeInfo(Map<String, String> newWorkerNodeInfo) {
         workerNodeInfoLock.lock();
         try {
             workerNodeInfo.clear();
@@ -418,6 +433,18 @@ public class ServerNodeManager implements InitializingBean 
{
     }
 
     /**
+     * sync single worker node info
+     */
+    private void syncSingleWorkerNodeInfo(String node, String info) {
+        workerNodeInfoLock.lock();
+        try {
+            workerNodeInfo.put(node, info);
+        } finally {
+            workerNodeInfoLock.unlock();
+        }
+    }
+
+    /**
      * destroy
      */
     @PreDestroy
diff --git 
a/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/register/ListenerManager.java
 
b/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/register/ListenerManager.java
index ee13405..94b13e6 100644
--- 
a/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/register/ListenerManager.java
+++ 
b/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/register/ListenerManager.java
@@ -55,12 +55,12 @@ public class ListenerManager {
      *
      *After the data changes, it is distributed to the corresponding listener 
for processing
      */
-    public static void dataChange(String key,String path, DataChangeEvent 
dataChangeEvent) {
+    public static void dataChange(String key,String path, String data, 
DataChangeEvent dataChangeEvent) {
         SubscribeListener notifyListener = listeners.get(key);
         if (null == notifyListener) {
             return;
         }
-        notifyListener.notify(path,dataChangeEvent);
+        notifyListener.notify(path, data, dataChangeEvent);
     }
 
 }
diff --git 
a/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/register/SubscribeListener.java
 
b/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/register/SubscribeListener.java
index 6a2f3d1..3db7f2e 100644
--- 
a/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/register/SubscribeListener.java
+++ 
b/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/register/SubscribeListener.java
@@ -25,6 +25,6 @@ public interface SubscribeListener {
     /**
      * Processing logic when the subscription node changes
      */
-    void notify(String path, DataChangeEvent dataChangeEvent);
+    void notify(String path, String data, DataChangeEvent dataChangeEvent);
 
 }

Reply via email to