This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 287576dfb9 [IOTDB-3703] Decouple load balancing from heartbeat (#6578)
287576dfb9 is described below
commit 287576dfb964ffa76b13e7ace47edecfb6a4b486
Author: YongzaoDan <[email protected]>
AuthorDate: Mon Jul 4 14:20:03 2022 +0800
[IOTDB-3703] Decouple load balancing from heartbeat (#6578)
---
.../iotdb/confignode/manager/load/LoadManager.java | 73 +++++++++++++---------
1 file changed, 42 insertions(+), 31 deletions(-)
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
index f8b0e04358..d01a26fc97 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
@@ -90,15 +90,21 @@ public class LoadManager {
private final PartitionBalancer partitionBalancer;
private final RouteBalancer routeBalancer;
- /** heartbeat executor service */
+ /** Heartbeat executor service */
+ private final AtomicInteger heartbeatCounter = new AtomicInteger(0);
+
+ private Future<?> currentHeartbeatFuture;
private final ScheduledExecutorService heartBeatExecutor =
IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor(LoadManager.class.getSimpleName());
- /** monitor for heartbeat state change */
- private final Object heartbeatMonitor = new Object();
+ /** Load balancing executor service */
+ private Future<?> currentLoadBalancingFuture;
- private Future<?> currentHeartbeatFuture;
- private final AtomicInteger balanceCount;
+ private final ScheduledExecutorService loadBalancingExecutor =
+
IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor(LoadManager.class.getSimpleName());
+
+ /** Monitor for leadership change */
+ private final Object scheduleMonitor = new Object();
public LoadManager(IManager configManager) {
this.configManager = configManager;
@@ -108,8 +114,6 @@ public class LoadManager {
this.regionBalancer = new RegionBalancer(configManager);
this.partitionBalancer = new PartitionBalancer(configManager);
this.routeBalancer = new RouteBalancer(configManager);
-
- this.balanceCount = new AtomicInteger(0);
}
/**
@@ -200,11 +204,11 @@ public class LoadManager {
return result;
}
- /** Start the heartbeat service */
+ /** Start the heartbeat service and the load balancing service */
public void start() {
LOGGER.debug("Start Heartbeat Service of LoadManager");
- synchronized (heartbeatMonitor) {
- balanceCount.set(0);
+ synchronized (scheduleMonitor) {
+ /* Start the heartbeat service */
if (currentHeartbeatFuture == null) {
currentHeartbeatFuture =
ScheduledExecutorUtil.safelyScheduleWithFixedDelay(
@@ -214,16 +218,29 @@ public class LoadManager {
heartbeatInterval,
TimeUnit.MILLISECONDS);
}
+
+ /* Start the load balancing service */
+ if (currentLoadBalancingFuture == null) {
+ currentLoadBalancingFuture =
+ ScheduledExecutorUtil.safelyScheduleWithFixedDelay(
+ loadBalancingExecutor,
+ this::updateNodeLoadStatistic,
+ 0,
+ heartbeatInterval,
+ TimeUnit.MILLISECONDS);
+ }
}
}
- /** Stop the heartbeat service */
+ /** Stop the heartbeat service and the load balancing service */
public void stop() {
- LOGGER.debug("Stop Heartbeat Service of LoadManager");
- synchronized (heartbeatMonitor) {
+ LOGGER.debug("Stop Heartbeat Service and LoadBalancing Service of
LoadManager");
+ synchronized (scheduleMonitor) {
if (currentHeartbeatFuture != null) {
currentHeartbeatFuture.cancel(false);
currentHeartbeatFuture = null;
+ currentLoadBalancingFuture.cancel(false);
+ currentLoadBalancingFuture = null;
}
}
}
@@ -235,31 +252,25 @@ public class LoadManager {
pingOnlineDataNodes(getNodeManager().getOnlineDataNodes(-1));
// Send heartbeat requests to all the registered ConfigNodes
pingRegisteredConfigNodes(getNodeManager().getRegisteredConfigNodes());
- // Do load balancing
- doLoadBalancing();
- balanceCount.getAndIncrement();
}
}
+ private void updateNodeLoadStatistic() {
+
heartbeatCacheMap.values().forEach(IHeartbeatStatistic::updateLoadStatistic);
+ }
+
private THeartbeatReq genHeartbeatReq() {
+ /* Generate heartbeat request */
THeartbeatReq heartbeatReq = new THeartbeatReq();
heartbeatReq.setHeartbeatTimestamp(System.currentTimeMillis());
- // We update RegionGroups' leadership in every 5s
- heartbeatReq.setNeedJudgeLeader(balanceCount.get() % 5 == 0);
- // We sample DataNode load in every 10s
- heartbeatReq.setNeedSamplingLoad(balanceCount.get() % 10 == 0);
- return heartbeatReq;
- }
+ // We update RegionGroups' leadership in every 5 heartbeat loop
+ heartbeatReq.setNeedJudgeLeader(heartbeatCounter.get() % 5 == 0);
+ // We sample DataNode's load in every 10 heartbeat loop
+ heartbeatReq.setNeedSamplingLoad(heartbeatCounter.get() % 10 == 0);
- private void doLoadBalancing() {
- if (balanceCount.get() % 10 == 0) {
- // We update nodes' load statistic in every 10s
- updateNodeLoadStatistic();
- }
- }
-
- private void updateNodeLoadStatistic() {
-
heartbeatCacheMap.values().forEach(IHeartbeatStatistic::updateLoadStatistic);
+ /* Update heartbeat counter */
+ heartbeatCounter.getAndUpdate((x) -> (x + 1) % 10);
+ return heartbeatReq;
}
/**