This is an automated email from the ASF dual-hosted git repository.

qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 287576dfb9 [IOTDB-3703] Decouple load balancing from heartbeat (#6578)
287576dfb9 is described below

commit 287576dfb964ffa76b13e7ace47edecfb6a4b486
Author: YongzaoDan <[email protected]>
AuthorDate: Mon Jul 4 14:20:03 2022 +0800

    [IOTDB-3703] Decouple load balancing from heartbeat (#6578)
---
 .../iotdb/confignode/manager/load/LoadManager.java | 73 +++++++++++++---------
 1 file changed, 42 insertions(+), 31 deletions(-)

diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
index f8b0e04358..d01a26fc97 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
@@ -90,15 +90,21 @@ public class LoadManager {
   private final PartitionBalancer partitionBalancer;
   private final RouteBalancer routeBalancer;
 
-  /** heartbeat executor service */
+  /** Heartbeat executor service */
+  private final AtomicInteger heartbeatCounter = new AtomicInteger(0);
+
+  private Future<?> currentHeartbeatFuture;
   private final ScheduledExecutorService heartBeatExecutor =
       
IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor(LoadManager.class.getSimpleName());
 
-  /** monitor for heartbeat state change */
-  private final Object heartbeatMonitor = new Object();
+  /** Load balancing executor service */
+  private Future<?> currentLoadBalancingFuture;
 
-  private Future<?> currentHeartbeatFuture;
-  private final AtomicInteger balanceCount;
+  private final ScheduledExecutorService loadBalancingExecutor =
+      
IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor(LoadManager.class.getSimpleName());
+
+  /** Monitor for leadership change */
+  private final Object scheduleMonitor = new Object();
 
   public LoadManager(IManager configManager) {
     this.configManager = configManager;
@@ -108,8 +114,6 @@ public class LoadManager {
     this.regionBalancer = new RegionBalancer(configManager);
     this.partitionBalancer = new PartitionBalancer(configManager);
     this.routeBalancer = new RouteBalancer(configManager);
-
-    this.balanceCount = new AtomicInteger(0);
   }
 
   /**
@@ -200,11 +204,11 @@ public class LoadManager {
     return result;
   }
 
-  /** Start the heartbeat service */
+  /** Start the heartbeat service and the load balancing service */
   public void start() {
     LOGGER.debug("Start Heartbeat Service of LoadManager");
-    synchronized (heartbeatMonitor) {
-      balanceCount.set(0);
+    synchronized (scheduleMonitor) {
+      /* Start the heartbeat service */
       if (currentHeartbeatFuture == null) {
         currentHeartbeatFuture =
             ScheduledExecutorUtil.safelyScheduleWithFixedDelay(
@@ -214,16 +218,29 @@ public class LoadManager {
                 heartbeatInterval,
                 TimeUnit.MILLISECONDS);
       }
+
+      /* Start the load balancing service */
+      if (currentLoadBalancingFuture == null) {
+        currentLoadBalancingFuture =
+            ScheduledExecutorUtil.safelyScheduleWithFixedDelay(
+                loadBalancingExecutor,
+                this::updateNodeLoadStatistic,
+                0,
+                heartbeatInterval,
+                TimeUnit.MILLISECONDS);
+      }
     }
   }
 
-  /** Stop the heartbeat service */
+  /** Stop the heartbeat service and the load balancing service */
   public void stop() {
-    LOGGER.debug("Stop Heartbeat Service of LoadManager");
-    synchronized (heartbeatMonitor) {
+    LOGGER.debug("Stop Heartbeat Service and LoadBalancing Service of 
LoadManager");
+    synchronized (scheduleMonitor) {
       if (currentHeartbeatFuture != null) {
         currentHeartbeatFuture.cancel(false);
         currentHeartbeatFuture = null;
+        currentLoadBalancingFuture.cancel(false);
+        currentLoadBalancingFuture = null;
       }
     }
   }
@@ -235,31 +252,25 @@ public class LoadManager {
       pingOnlineDataNodes(getNodeManager().getOnlineDataNodes(-1));
       // Send heartbeat requests to all the registered ConfigNodes
       pingRegisteredConfigNodes(getNodeManager().getRegisteredConfigNodes());
-      // Do load balancing
-      doLoadBalancing();
-      balanceCount.getAndIncrement();
     }
   }
 
+  private void updateNodeLoadStatistic() {
+    
heartbeatCacheMap.values().forEach(IHeartbeatStatistic::updateLoadStatistic);
+  }
+
   private THeartbeatReq genHeartbeatReq() {
+    /* Generate heartbeat request */
     THeartbeatReq heartbeatReq = new THeartbeatReq();
     heartbeatReq.setHeartbeatTimestamp(System.currentTimeMillis());
-    // We update RegionGroups' leadership in every 5s
-    heartbeatReq.setNeedJudgeLeader(balanceCount.get() % 5 == 0);
-    // We sample DataNode load in every 10s
-    heartbeatReq.setNeedSamplingLoad(balanceCount.get() % 10 == 0);
-    return heartbeatReq;
-  }
+    // We update RegionGroups' leadership in every 5 heartbeat loop
+    heartbeatReq.setNeedJudgeLeader(heartbeatCounter.get() % 5 == 0);
+    // We sample DataNode's load in every 10 heartbeat loop
+    heartbeatReq.setNeedSamplingLoad(heartbeatCounter.get() % 10 == 0);
 
-  private void doLoadBalancing() {
-    if (balanceCount.get() % 10 == 0) {
-      // We update nodes' load statistic in every 10s
-      updateNodeLoadStatistic();
-    }
-  }
-
-  private void updateNodeLoadStatistic() {
-    
heartbeatCacheMap.values().forEach(IHeartbeatStatistic::updateLoadStatistic);
+    /* Update heartbeat counter */
+    heartbeatCounter.getAndUpdate((x) -> (x + 1) % 10);
+    return heartbeatReq;
   }
 
   /**

Reply via email to