This is an automated email from the ASF dual-hosted git repository.

wenjun pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new 3106054ea7 [Improvement-12907] Change heartbeat log level to debug 
(#12980)
3106054ea7 is described below

commit 3106054ea78a0036069d1860a228723519efab38
Author: Yann Ann <[email protected]>
AuthorDate: Fri Nov 25 17:37:30 2022 +0800

    [Improvement-12907] Change heartbeat log level to debug (#12980)
---
 .../dispatch/host/LowerWeightHostManager.java      | 70 +++++++++++-----------
 .../server/master/registry/ServerNodeManager.java  | 53 +++++++---------
 .../server/master/task/MasterHeartBeatTask.java    |  2 +-
 .../server/worker/task/WorkerHeartBeatTask.java    |  5 +-
 4 files changed, 60 insertions(+), 70 deletions(-)

diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/LowerWeightHostManager.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/LowerWeightHostManager.java
index 75bd9c1a0a..5be20c23c1 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/LowerWeightHostManager.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/LowerWeightHostManager.java
@@ -99,34 +99,44 @@ public class LowerWeightHostManager extends 
CommonHostManager {
         public void notify(Map<String, Set<String>> workerGroups, Map<String, 
WorkerHeartBeat> workerNodeInfo) {
             syncWorkerResources(workerGroups, workerNodeInfo);
         }
-    }
 
-    /**
-     * Sync worker resource.
-     *
-     * @param workerGroupNodes  worker group nodes, key is worker group, value 
is worker group nodes.
-     * @param workerNodeInfoMap worker node info map, key is worker node, 
value is worker info.
-     */
-    private void syncWorkerResources(final Map<String, Set<String>> 
workerGroupNodes,
-                                     final Map<String, WorkerHeartBeat> 
workerNodeInfoMap) {
-        try {
-            Map<String, Set<HostWeight>> workerHostWeights = new HashMap<>();
-            for (Map.Entry<String, Set<String>> entry : 
workerGroupNodes.entrySet()) {
-                String workerGroup = entry.getKey();
-                Set<String> nodes = entry.getValue();
-                Set<HostWeight> hostWeights = new HashSet<>(nodes.size());
-                for (String node : nodes) {
-                    WorkerHeartBeat heartbeat = 
workerNodeInfoMap.getOrDefault(node, null);
-                    Optional<HostWeight> hostWeightOpt = getHostWeight(node, 
workerGroup, heartbeat);
-                    hostWeightOpt.ifPresent(hostWeights::add);
-                }
-                if (!hostWeights.isEmpty()) {
-                    workerHostWeights.put(workerGroup, hostWeights);
+        /**
+         * Sync worker resource.
+         *
+         * @param workerGroupNodes  worker group nodes, key is worker group, 
value is worker group nodes.
+         * @param workerNodeInfoMap worker node info map, key is worker node, 
value is worker info.
+         */
+        private void syncWorkerResources(final Map<String, Set<String>> 
workerGroupNodes,
+                                         final Map<String, WorkerHeartBeat> 
workerNodeInfoMap) {
+            try {
+                Map<String, Set<HostWeight>> workerHostWeights = new 
HashMap<>();
+                for (Map.Entry<String, Set<String>> entry : 
workerGroupNodes.entrySet()) {
+                    String workerGroup = entry.getKey();
+                    Set<String> nodes = entry.getValue();
+                    Set<HostWeight> hostWeights = new HashSet<>(nodes.size());
+                    for (String node : nodes) {
+                        WorkerHeartBeat heartbeat = 
workerNodeInfoMap.getOrDefault(node, null);
+                        Optional<HostWeight> hostWeightOpt = 
getHostWeight(node, workerGroup, heartbeat);
+                        hostWeightOpt.ifPresent(hostWeights::add);
+                    }
+                    if (!hostWeights.isEmpty()) {
+                        workerHostWeights.put(workerGroup, hostWeights);
+                    }
                 }
+                syncWorkerHostWeight(workerHostWeights);
+            } catch (Throwable ex) {
+                logger.error("Sync worker resource error", ex);
+            }
+        }
+
+        private void syncWorkerHostWeight(Map<String, Set<HostWeight>> 
workerHostWeights) {
+            lock.lock();
+            try {
+                workerHostWeightsMap.clear();
+                workerHostWeightsMap.putAll(workerHostWeights);
+            } finally {
+                lock.unlock();
             }
-            syncWorkerHostWeight(workerHostWeights);
-        } catch (Throwable ex) {
-            logger.error("Sync worker resource error", ex);
         }
     }
 
@@ -155,16 +165,6 @@ public class LowerWeightHostManager extends 
CommonHostManager {
                         heartBeat.getStartupTime()));
     }
 
-    private void syncWorkerHostWeight(Map<String, Set<HostWeight>> 
workerHostWeights) {
-        lock.lock();
-        try {
-            workerHostWeightsMap.clear();
-            workerHostWeightsMap.putAll(workerHostWeights);
-        } finally {
-            lock.unlock();
-        }
-    }
-
     private Set<HostWeight> getWorkerHostWeights(String workerGroup) {
         lock.lock();
         try {
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/ServerNodeManager.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/ServerNodeManager.java
index e0ffc34373..def3829c8b 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/ServerNodeManager.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/ServerNodeManager.java
@@ -129,8 +129,7 @@ public class ServerNodeManager implements InitializingBean {
 
         // load nodes from zookeeper
         updateMasterNodes();
-        updateWorkerNodes();
-        updateWorkerGroupMappings();
+        refreshWorkerNodesAndGroupMappings();
 
         // init executor service
         executorService =
@@ -153,27 +152,21 @@ public class ServerNodeManager implements 
InitializingBean {
         @Override
         public void run() {
             try {
-
                 // sync worker node info
-                updateWorkerNodes();
-                updateWorkerGroupMappings();
-                notifyWorkerInfoChangeListeners();
+                refreshWorkerNodesAndGroupMappings();
             } catch (Exception e) {
                 logger.error("WorkerNodeInfoAndGroupDbSyncTask error:", e);
             }
         }
     }
 
-    protected Set<String> getWorkerAddressByWorkerGroup(Map<String, String> 
newWorkerNodeInfo,
-                                                        WorkerGroup wg) {
-        Set<String> nodes = new HashSet<>();
-        String[] addrs = wg.getAddrList().split(Constants.COMMA);
-        for (String addr : addrs) {
-            if (newWorkerNodeInfo.containsKey(addr)) {
-                nodes.add(addr);
-            }
-        }
-        return nodes;
+    /**
+     * Refresh worker nodes and worker group mapping information
+     */
+    private void refreshWorkerNodesAndGroupMappings() {
+        updateWorkerNodes();
+        updateWorkerGroupMappings();
+        notifyWorkerInfoChangeListeners();
     }
 
     /**
@@ -204,7 +197,15 @@ public class ServerNodeManager implements InitializingBean 
{
                 } catch (Exception ex) {
                     logger.error("WorkerGroupListener capture data change and 
get data failed", ex);
                 }
+            }
+        }
 
+        private void syncSingleWorkerNodeInfo(String workerAddress, 
WorkerHeartBeat info) {
+            workerNodeInfoWriteLock.lock();
+            try {
+                workerNodeInfo.put(workerAddress, info);
+            } finally {
+                workerNodeInfoWriteLock.unlock();
             }
         }
     }
@@ -241,8 +242,8 @@ public class ServerNodeManager implements InitializingBean {
         try {
             registryClient.getLock(nodeLock);
             Collection<String> currentNodes = 
registryClient.getMasterNodesDirectly();
-            List<Server> masterNodes = 
registryClient.getServerList(NodeType.MASTER);
-            syncMasterNodes(currentNodes, masterNodes);
+            List<Server> masterNodeList = 
registryClient.getServerList(NodeType.MASTER);
+            syncMasterNodes(currentNodes, masterNodeList);
         } catch (Exception e) {
             logger.error("update master nodes error", e);
         } finally {
@@ -289,7 +290,6 @@ public class ServerNodeManager implements InitializingBean {
         try {
             workerGroupNodes.clear();
             workerGroupNodes.putAll(tmpWorkerGroupMappings);
-            notifyWorkerInfoChangeListeners();
         } finally {
             workerGroupWriteLock.unlock();
         }
@@ -363,15 +363,6 @@ public class ServerNodeManager implements InitializingBean 
{
         }
     }
 
-    private void syncSingleWorkerNodeInfo(String workerAddress, 
WorkerHeartBeat info) {
-        workerNodeInfoWriteLock.lock();
-        try {
-            workerNodeInfo.put(workerAddress, info);
-        } finally {
-            workerNodeInfoWriteLock.unlock();
-        }
-    }
-
     /**
      * Add the resource change listener, when the resource changed, the 
listener will be notified.
      *
@@ -382,10 +373,10 @@ public class ServerNodeManager implements 
InitializingBean {
     }
 
     private void notifyWorkerInfoChangeListeners() {
-        Map<String, Set<String>> workerGroupNodes = getWorkerGroupNodes();
-        Map<String, WorkerHeartBeat> workerNodeInfo = getWorkerNodeInfo();
+        Map<String, Set<String>> workerGroupNodeMap = getWorkerGroupNodes();
+        Map<String, WorkerHeartBeat> workerNodeInfoMap = getWorkerNodeInfo();
         for (WorkerInfoChangeListener listener : workerInfoChangeListeners) {
-            listener.notify(workerGroupNodes, workerNodeInfo);
+            listener.notify(workerGroupNodeMap, workerNodeInfoMap);
         }
     }
 
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/task/MasterHeartBeatTask.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/task/MasterHeartBeatTask.java
index 45bf0d74cc..e7dad575ec 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/task/MasterHeartBeatTask.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/task/MasterHeartBeatTask.java
@@ -68,7 +68,7 @@ public class MasterHeartBeatTask extends 
BaseHeartBeatTask<MasterHeartBeat> {
     public void writeHeartBeat(MasterHeartBeat masterHeartBeat) {
         String masterHeartBeatJson = JSONUtils.toJsonString(masterHeartBeat);
         registryClient.persistEphemeral(heartBeatPath, masterHeartBeatJson);
-        log.info("Success write master heartBeatInfo into registry, 
masterRegistryPath: {}, heartBeatInfo: {}",
+        log.debug("Success write master heartBeatInfo into registry, 
masterRegistryPath: {}, heartBeatInfo: {}",
                 heartBeatPath, masterHeartBeatJson);
     }
 }
diff --git 
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/task/WorkerHeartBeatTask.java
 
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/task/WorkerHeartBeatTask.java
index 18b19a9eb7..2909bd686b 100644
--- 
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/task/WorkerHeartBeatTask.java
+++ 
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/task/WorkerHeartBeatTask.java
@@ -59,9 +59,8 @@ public class WorkerHeartBeatTask extends 
BaseHeartBeatTask<WorkerHeartBeat> {
         double reservedMemory = workerConfig.getReservedMemory();
         double availablePhysicalMemorySize = 
OSUtils.availablePhysicalMemorySize();
         int execThreads = workerConfig.getExecThreads();
-        int workerWaitingTaskCount = this.workerWaitingTaskCount.get();
         int serverStatus = getServerStatus(loadAverage, maxCpuLoadAvg, 
availablePhysicalMemorySize, reservedMemory,
-                execThreads, workerWaitingTaskCount);
+                execThreads, this.workerWaitingTaskCount.get());
 
         return WorkerHeartBeat.builder()
                 .startupTime(ServerLifeCycleManager.getServerStartupTime())
@@ -86,7 +85,7 @@ public class WorkerHeartBeatTask extends 
BaseHeartBeatTask<WorkerHeartBeat> {
         String workerHeartBeatJson = JSONUtils.toJsonString(workerHeartBeat);
         String workerRegistryPath = workerConfig.getWorkerRegistryPath();
         registryClient.persistEphemeral(workerRegistryPath, 
workerHeartBeatJson);
-        log.info(
+        log.debug(
                 "Success write worker group heartBeatInfo into registry, 
workerRegistryPath: {} workerHeartBeatInfo: {}",
                 workerRegistryPath, workerHeartBeatJson);
     }

Reply via email to