This is an automated email from the ASF dual-hosted git repository.
kirs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new 72daba5 [DS-6694][Master] Reduce the refresh resource interval of
LowerWeight (#6695)
72daba5 is described below
commit 72daba58cd378a347e9657b994bb631c0704fc2a
Author: wind <[email protected]>
AuthorDate: Fri Nov 5 19:10:36 2021 +0800
[DS-6694][Master] Reduce the refresh resource interval of LowerWeight
(#6695)
* [DS-6694][Master] Reduce the refresh resource interval of LowerWeight
* add server node update event handle
Co-authored-by: caishunfeng <[email protected]>
---
.../dolphinscheduler/common/utils/HeartBeat.java | 1 +
.../registry/zookeeper/ZookeeperRegistry.java | 4 +--
.../registry/zookeeper/ZookeeperRegistryTest.java | 2 +-
.../dispatch/host/LowerWeightHostManager.java | 2 +-
.../registry/MasterRegistryDataListener.java | 3 +-
.../server/master/registry/ServerNodeManager.java | 35 +++++++++++++++++++---
.../spi/register/ListenerManager.java | 4 +--
.../spi/register/SubscribeListener.java | 2 +-
8 files changed, 39 insertions(+), 14 deletions(-)
diff --git
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/HeartBeat.java
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/HeartBeat.java
index bec0f75..d28cd3d 100644
---
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/HeartBeat.java
+++
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/HeartBeat.java
@@ -184,6 +184,7 @@ public class HeartBeat {
* update server state
*/
public void updateServerState() {
+ this.reportTime = System.currentTimeMillis();
if (loadAverage > maxCpuloadAvg || availablePhysicalMemorySize <
reservedMemory) {
logger.warn("current cpu load average {} is too high or available
memory {}G is too low, under max.cpuload.avg={} and reserved.memory={}G",
loadAverage, availablePhysicalMemorySize, maxCpuloadAvg,
reservedMemory);
diff --git
a/dolphinscheduler-registry-plugin/dolphinscheduler-registry-zookeeper/src/main/java/org/apache/dolphinscheduler/plugin/registry/zookeeper/ZookeeperRegistry.java
b/dolphinscheduler-registry-plugin/dolphinscheduler-registry-zookeeper/src/main/java/org/apache/dolphinscheduler/plugin/registry/zookeeper/ZookeeperRegistry.java
index 64b0b13..e84666a 100644
---
a/dolphinscheduler-registry-plugin/dolphinscheduler-registry-zookeeper/src/main/java/org/apache/dolphinscheduler/plugin/registry/zookeeper/ZookeeperRegistry.java
+++
b/dolphinscheduler-registry-plugin/dolphinscheduler-registry-zookeeper/src/main/java/org/apache/dolphinscheduler/plugin/registry/zookeeper/ZookeeperRegistry.java
@@ -148,14 +148,12 @@ public class ZookeeperRegistry implements Registry {
String dataPath = null;
switch (type) {
case NODE_ADDED:
-
dataPath = event.getData().getPath();
eventType = DataChangeEvent.ADD;
break;
case NODE_UPDATED:
eventType = DataChangeEvent.UPDATE;
dataPath = event.getData().getPath();
-
break;
case NODE_REMOVED:
eventType = DataChangeEvent.REMOVE;
@@ -164,7 +162,7 @@ public class ZookeeperRegistry implements Registry {
default:
}
if (null != eventType && null != dataPath) {
- ListenerManager.dataChange(path, dataPath, eventType);
+ ListenerManager.dataChange(path, dataPath, new
String(event.getData().getData()), eventType);
}
};
treeCache.getListenable().addListener(treeCacheListener);
diff --git
a/dolphinscheduler-registry-plugin/dolphinscheduler-registry-zookeeper/src/test/java/org/apache/dolphinscheduler/plugin/registry/zookeeper/ZookeeperRegistryTest.java
b/dolphinscheduler-registry-plugin/dolphinscheduler-registry-zookeeper/src/test/java/org/apache/dolphinscheduler/plugin/registry/zookeeper/ZookeeperRegistryTest.java
index 900c7e4..a5dc33b 100644
---
a/dolphinscheduler-registry-plugin/dolphinscheduler-registry-zookeeper/src/test/java/org/apache/dolphinscheduler/plugin/registry/zookeeper/ZookeeperRegistryTest.java
+++
b/dolphinscheduler-registry-plugin/dolphinscheduler-registry-zookeeper/src/test/java/org/apache/dolphinscheduler/plugin/registry/zookeeper/ZookeeperRegistryTest.java
@@ -115,7 +115,7 @@ public class ZookeeperRegistryTest {
class TestListener implements SubscribeListener {
@Override
- public void notify(String path, DataChangeEvent dataChangeEvent) {
+ public void notify(String path, String data, DataChangeEvent
dataChangeEvent) {
logger.info("I'm test listener");
}
}
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/LowerWeightHostManager.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/LowerWeightHostManager.java
index f78b957..64a15b5 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/LowerWeightHostManager.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/LowerWeightHostManager.java
@@ -78,7 +78,7 @@ public class LowerWeightHostManager extends CommonHostManager
{
this.workerHostWeightsMap = new ConcurrentHashMap<>();
this.lock = new ReentrantLock();
this.executorService = Executors.newSingleThreadScheduledExecutor(new
NamedThreadFactory("LowerWeightHostManagerExecutor"));
- this.executorService.scheduleWithFixedDelay(new RefreshResourceTask(),
0, 5, TimeUnit.SECONDS);
+ this.executorService.scheduleWithFixedDelay(new RefreshResourceTask(),
0, 1, TimeUnit.SECONDS);
}
@PreDestroy
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryDataListener.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryDataListener.java
index c712ac0..4fd50a0 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryDataListener.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryDataListener.java
@@ -39,9 +39,8 @@ public class MasterRegistryDataListener implements
SubscribeListener {
masterRegistryClient =
SpringApplicationContext.getBean(MasterRegistryClient.class);
}
-
@Override
- public void notify(String path, DataChangeEvent event) {
+ public void notify(String path, String data, DataChangeEvent event) {
//monitor master
if (path.startsWith(REGISTRY_DOLPHINSCHEDULER_MASTERS +
Constants.SINGLE_SLASH)) {
handleMasterEvent(event, path);
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/ServerNodeManager.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/ServerNodeManager.java
index 09f4cc2..1bceeb7 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/ServerNodeManager.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/ServerNodeManager.java
@@ -190,7 +190,7 @@ public class ServerNodeManager implements InitializingBean {
public void run() {
// sync worker node info
Map<String, String> newWorkerNodeInfo =
registryClient.getServerMaps(NodeType.WORKER, true);
- syncWorkerNodeInfo(newWorkerNodeInfo);
+ syncAllWorkerNodeInfo(newWorkerNodeInfo);
// sync worker group nodes from database
List<WorkerGroup> workerGroupList =
workerGroupMapper.queryAllWorkerGroup();
@@ -218,7 +218,7 @@ public class ServerNodeManager implements InitializingBean {
class WorkerDataListener implements SubscribeListener {
@Override
- public void notify(String path, DataChangeEvent dataChangeEvent) {
+ public void notify(String path, String data, DataChangeEvent
dataChangeEvent) {
if (registryClient.isWorkerPath(path)) {
try {
if (dataChangeEvent == DataChangeEvent.ADD) {
@@ -233,6 +233,14 @@ public class ServerNodeManager implements InitializingBean
{
Set<String> currentNodes =
registryClient.getWorkerGroupNodesDirectly(group);
syncWorkerGroupNodes(group, currentNodes);
alertDao.sendServerStopedAlert(1, path, "WORKER");
+ } else if (dataChangeEvent == DataChangeEvent.UPDATE) {
+ logger.debug("worker group node : {} update, data:
{}", path, data);
+ String group = parseGroup(path);
+ Set<String> currentNodes =
registryClient.getWorkerGroupNodesDirectly(group);
+ syncWorkerGroupNodes(group, currentNodes);
+
+ String node = parseNode(path);
+ syncSingleWorkerNodeInfo(node, data);
}
} catch (IllegalArgumentException ex) {
logger.warn(ex.getMessage());
@@ -251,6 +259,13 @@ public class ServerNodeManager implements InitializingBean
{
return parts[parts.length - 2];
}
+ private String parseNode(String path) {
+ String[] parts = path.split("/");
+ if (parts.length < WORKER_LISTENER_CHECK_LENGTH) {
+ throw new IllegalArgumentException(String.format("worker group
path : %s is not valid, ignore", path));
+ }
+ return parts[parts.length - 1];
+ }
}
/**
@@ -258,7 +273,7 @@ public class ServerNodeManager implements InitializingBean {
*/
class MasterDataListener implements SubscribeListener {
@Override
- public void notify(String path, DataChangeEvent dataChangeEvent) {
+ public void notify(String path, String data, DataChangeEvent
dataChangeEvent) {
if (registryClient.isMasterPath(path)) {
try {
if (dataChangeEvent.equals(DataChangeEvent.ADD)) {
@@ -407,7 +422,7 @@ public class ServerNodeManager implements InitializingBean {
*
* @param newWorkerNodeInfo new worker node info
*/
- private void syncWorkerNodeInfo(Map<String, String> newWorkerNodeInfo) {
+ private void syncAllWorkerNodeInfo(Map<String, String> newWorkerNodeInfo) {
workerNodeInfoLock.lock();
try {
workerNodeInfo.clear();
@@ -418,6 +433,18 @@ public class ServerNodeManager implements InitializingBean
{
}
/**
+ * sync single worker node info
+ */
+ private void syncSingleWorkerNodeInfo(String node, String info) {
+ workerNodeInfoLock.lock();
+ try {
+ workerNodeInfo.put(node, info);
+ } finally {
+ workerNodeInfoLock.unlock();
+ }
+ }
+
+ /**
* destroy
*/
@PreDestroy
diff --git
a/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/register/ListenerManager.java
b/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/register/ListenerManager.java
index ee13405..94b13e6 100644
---
a/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/register/ListenerManager.java
+++
b/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/register/ListenerManager.java
@@ -55,12 +55,12 @@ public class ListenerManager {
*
*After the data changes, it is distributed to the corresponding listener
for processing
*/
- public static void dataChange(String key,String path, DataChangeEvent
dataChangeEvent) {
+ public static void dataChange(String key,String path, String data,
DataChangeEvent dataChangeEvent) {
SubscribeListener notifyListener = listeners.get(key);
if (null == notifyListener) {
return;
}
- notifyListener.notify(path,dataChangeEvent);
+ notifyListener.notify(path, data, dataChangeEvent);
}
}
diff --git
a/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/register/SubscribeListener.java
b/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/register/SubscribeListener.java
index 6a2f3d1..3db7f2e 100644
---
a/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/register/SubscribeListener.java
+++
b/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/register/SubscribeListener.java
@@ -25,6 +25,6 @@ public interface SubscribeListener {
/**
* Processing logic when the subscription node changes
*/
- void notify(String path, DataChangeEvent dataChangeEvent);
+ void notify(String path, String data, DataChangeEvent dataChangeEvent);
}