This is an automated email from the ASF dual-hosted git repository.
wenjun pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new 3106054ea7 [Improvement-12907] Change heartbeat log level to debug
(#12980)
3106054ea7 is described below
commit 3106054ea78a0036069d1860a228723519efab38
Author: Yann Ann <[email protected]>
AuthorDate: Fri Nov 25 17:37:30 2022 +0800
[Improvement-12907] Change heartbeat log level to debug (#12980)
---
.../dispatch/host/LowerWeightHostManager.java | 70 +++++++++++-----------
.../server/master/registry/ServerNodeManager.java | 53 +++++++---------
.../server/master/task/MasterHeartBeatTask.java | 2 +-
.../server/worker/task/WorkerHeartBeatTask.java | 5 +-
4 files changed, 60 insertions(+), 70 deletions(-)
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/LowerWeightHostManager.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/LowerWeightHostManager.java
index 75bd9c1a0a..5be20c23c1 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/LowerWeightHostManager.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/LowerWeightHostManager.java
@@ -99,34 +99,44 @@ public class LowerWeightHostManager extends
CommonHostManager {
public void notify(Map<String, Set<String>> workerGroups, Map<String,
WorkerHeartBeat> workerNodeInfo) {
syncWorkerResources(workerGroups, workerNodeInfo);
}
- }
- /**
- * Sync worker resource.
- *
- * @param workerGroupNodes worker group nodes, key is worker group, value
is worker group nodes.
- * @param workerNodeInfoMap worker node info map, key is worker node,
value is worker info.
- */
- private void syncWorkerResources(final Map<String, Set<String>>
workerGroupNodes,
- final Map<String, WorkerHeartBeat>
workerNodeInfoMap) {
- try {
- Map<String, Set<HostWeight>> workerHostWeights = new HashMap<>();
- for (Map.Entry<String, Set<String>> entry :
workerGroupNodes.entrySet()) {
- String workerGroup = entry.getKey();
- Set<String> nodes = entry.getValue();
- Set<HostWeight> hostWeights = new HashSet<>(nodes.size());
- for (String node : nodes) {
- WorkerHeartBeat heartbeat =
workerNodeInfoMap.getOrDefault(node, null);
- Optional<HostWeight> hostWeightOpt = getHostWeight(node,
workerGroup, heartbeat);
- hostWeightOpt.ifPresent(hostWeights::add);
- }
- if (!hostWeights.isEmpty()) {
- workerHostWeights.put(workerGroup, hostWeights);
+ /**
+ * Sync worker resource.
+ *
+ * @param workerGroupNodes worker group nodes, key is worker group,
value is worker group nodes.
+ * @param workerNodeInfoMap worker node info map, key is worker node,
value is worker info.
+ */
+ private void syncWorkerResources(final Map<String, Set<String>>
workerGroupNodes,
+ final Map<String, WorkerHeartBeat>
workerNodeInfoMap) {
+ try {
+ Map<String, Set<HostWeight>> workerHostWeights = new
HashMap<>();
+ for (Map.Entry<String, Set<String>> entry :
workerGroupNodes.entrySet()) {
+ String workerGroup = entry.getKey();
+ Set<String> nodes = entry.getValue();
+ Set<HostWeight> hostWeights = new HashSet<>(nodes.size());
+ for (String node : nodes) {
+ WorkerHeartBeat heartbeat =
workerNodeInfoMap.getOrDefault(node, null);
+ Optional<HostWeight> hostWeightOpt =
getHostWeight(node, workerGroup, heartbeat);
+ hostWeightOpt.ifPresent(hostWeights::add);
+ }
+ if (!hostWeights.isEmpty()) {
+ workerHostWeights.put(workerGroup, hostWeights);
+ }
}
+ syncWorkerHostWeight(workerHostWeights);
+ } catch (Throwable ex) {
+ logger.error("Sync worker resource error", ex);
+ }
+ }
+
+ private void syncWorkerHostWeight(Map<String, Set<HostWeight>>
workerHostWeights) {
+ lock.lock();
+ try {
+ workerHostWeightsMap.clear();
+ workerHostWeightsMap.putAll(workerHostWeights);
+ } finally {
+ lock.unlock();
}
- syncWorkerHostWeight(workerHostWeights);
- } catch (Throwable ex) {
- logger.error("Sync worker resource error", ex);
}
}
@@ -155,16 +165,6 @@ public class LowerWeightHostManager extends
CommonHostManager {
heartBeat.getStartupTime()));
}
- private void syncWorkerHostWeight(Map<String, Set<HostWeight>>
workerHostWeights) {
- lock.lock();
- try {
- workerHostWeightsMap.clear();
- workerHostWeightsMap.putAll(workerHostWeights);
- } finally {
- lock.unlock();
- }
- }
-
private Set<HostWeight> getWorkerHostWeights(String workerGroup) {
lock.lock();
try {
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/ServerNodeManager.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/ServerNodeManager.java
index e0ffc34373..def3829c8b 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/ServerNodeManager.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/ServerNodeManager.java
@@ -129,8 +129,7 @@ public class ServerNodeManager implements InitializingBean {
// load nodes from zookeeper
updateMasterNodes();
- updateWorkerNodes();
- updateWorkerGroupMappings();
+ refreshWorkerNodesAndGroupMappings();
// init executor service
executorService =
@@ -153,27 +152,21 @@ public class ServerNodeManager implements
InitializingBean {
@Override
public void run() {
try {
-
// sync worker node info
- updateWorkerNodes();
- updateWorkerGroupMappings();
- notifyWorkerInfoChangeListeners();
+ refreshWorkerNodesAndGroupMappings();
} catch (Exception e) {
logger.error("WorkerNodeInfoAndGroupDbSyncTask error:", e);
}
}
}
- protected Set<String> getWorkerAddressByWorkerGroup(Map<String, String>
newWorkerNodeInfo,
- WorkerGroup wg) {
- Set<String> nodes = new HashSet<>();
- String[] addrs = wg.getAddrList().split(Constants.COMMA);
- for (String addr : addrs) {
- if (newWorkerNodeInfo.containsKey(addr)) {
- nodes.add(addr);
- }
- }
- return nodes;
+ /**
+ * Refresh worker nodes and worker group mapping information
+ */
+ private void refreshWorkerNodesAndGroupMappings() {
+ updateWorkerNodes();
+ updateWorkerGroupMappings();
+ notifyWorkerInfoChangeListeners();
}
/**
@@ -204,7 +197,15 @@ public class ServerNodeManager implements InitializingBean
{
} catch (Exception ex) {
logger.error("WorkerGroupListener capture data change and
get data failed", ex);
}
+ }
+ }
+ private void syncSingleWorkerNodeInfo(String workerAddress,
WorkerHeartBeat info) {
+ workerNodeInfoWriteLock.lock();
+ try {
+ workerNodeInfo.put(workerAddress, info);
+ } finally {
+ workerNodeInfoWriteLock.unlock();
}
}
}
@@ -241,8 +242,8 @@ public class ServerNodeManager implements InitializingBean {
try {
registryClient.getLock(nodeLock);
Collection<String> currentNodes =
registryClient.getMasterNodesDirectly();
- List<Server> masterNodes =
registryClient.getServerList(NodeType.MASTER);
- syncMasterNodes(currentNodes, masterNodes);
+ List<Server> masterNodeList =
registryClient.getServerList(NodeType.MASTER);
+ syncMasterNodes(currentNodes, masterNodeList);
} catch (Exception e) {
logger.error("update master nodes error", e);
} finally {
@@ -289,7 +290,6 @@ public class ServerNodeManager implements InitializingBean {
try {
workerGroupNodes.clear();
workerGroupNodes.putAll(tmpWorkerGroupMappings);
- notifyWorkerInfoChangeListeners();
} finally {
workerGroupWriteLock.unlock();
}
@@ -363,15 +363,6 @@ public class ServerNodeManager implements InitializingBean
{
}
}
- private void syncSingleWorkerNodeInfo(String workerAddress,
WorkerHeartBeat info) {
- workerNodeInfoWriteLock.lock();
- try {
- workerNodeInfo.put(workerAddress, info);
- } finally {
- workerNodeInfoWriteLock.unlock();
- }
- }
-
/**
* Add the resource change listener, when the resource changed, the
listener will be notified.
*
@@ -382,10 +373,10 @@ public class ServerNodeManager implements
InitializingBean {
}
private void notifyWorkerInfoChangeListeners() {
- Map<String, Set<String>> workerGroupNodes = getWorkerGroupNodes();
- Map<String, WorkerHeartBeat> workerNodeInfo = getWorkerNodeInfo();
+ Map<String, Set<String>> workerGroupNodeMap = getWorkerGroupNodes();
+ Map<String, WorkerHeartBeat> workerNodeInfoMap = getWorkerNodeInfo();
for (WorkerInfoChangeListener listener : workerInfoChangeListeners) {
- listener.notify(workerGroupNodes, workerNodeInfo);
+ listener.notify(workerGroupNodeMap, workerNodeInfoMap);
}
}
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/task/MasterHeartBeatTask.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/task/MasterHeartBeatTask.java
index 45bf0d74cc..e7dad575ec 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/task/MasterHeartBeatTask.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/task/MasterHeartBeatTask.java
@@ -68,7 +68,7 @@ public class MasterHeartBeatTask extends
BaseHeartBeatTask<MasterHeartBeat> {
public void writeHeartBeat(MasterHeartBeat masterHeartBeat) {
String masterHeartBeatJson = JSONUtils.toJsonString(masterHeartBeat);
registryClient.persistEphemeral(heartBeatPath, masterHeartBeatJson);
- log.info("Success write master heartBeatInfo into registry,
masterRegistryPath: {}, heartBeatInfo: {}",
+ log.debug("Success write master heartBeatInfo into registry,
masterRegistryPath: {}, heartBeatInfo: {}",
heartBeatPath, masterHeartBeatJson);
}
}
diff --git
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/task/WorkerHeartBeatTask.java
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/task/WorkerHeartBeatTask.java
index 18b19a9eb7..2909bd686b 100644
---
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/task/WorkerHeartBeatTask.java
+++
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/task/WorkerHeartBeatTask.java
@@ -59,9 +59,8 @@ public class WorkerHeartBeatTask extends
BaseHeartBeatTask<WorkerHeartBeat> {
double reservedMemory = workerConfig.getReservedMemory();
double availablePhysicalMemorySize =
OSUtils.availablePhysicalMemorySize();
int execThreads = workerConfig.getExecThreads();
- int workerWaitingTaskCount = this.workerWaitingTaskCount.get();
int serverStatus = getServerStatus(loadAverage, maxCpuLoadAvg,
availablePhysicalMemorySize, reservedMemory,
- execThreads, workerWaitingTaskCount);
+ execThreads, this.workerWaitingTaskCount.get());
return WorkerHeartBeat.builder()
.startupTime(ServerLifeCycleManager.getServerStartupTime())
@@ -86,7 +85,7 @@ public class WorkerHeartBeatTask extends
BaseHeartBeatTask<WorkerHeartBeat> {
String workerHeartBeatJson = JSONUtils.toJsonString(workerHeartBeat);
String workerRegistryPath = workerConfig.getWorkerRegistryPath();
registryClient.persistEphemeral(workerRegistryPath,
workerHeartBeatJson);
- log.info(
+ log.debug(
"Success write worker group heartBeatInfo into registry,
workerRegistryPath: {} workerHeartBeatInfo: {}",
workerRegistryPath, workerHeartBeatJson);
}