This is an automated email from the ASF dual-hosted git repository.

qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 826a0e4818 [IOTDB-3509] Read/Write Routing Policy (Routing to leader) 
(#6377)
826a0e4818 is described below

commit 826a0e48188dfb1f61a7703175c05fb4e7a37110
Author: YongzaoDan <[email protected]>
AuthorDate: Thu Jun 30 13:59:55 2022 +0800

    [IOTDB-3509] Read/Write Routing Policy (Routing to leader) (#6377)
---
 .../resources/conf/iotdb-confignode.properties     | 13 +++
 .../client/handlers/DataNodeHeartbeatHandler.java  | 29 ++++++-
 .../iotdb/confignode/conf/ConfigNodeConfig.java    | 12 +++
 .../confignode/conf/ConfigNodeDescriptor.java      |  2 +
 .../confignode/conf/ConfigNodeStartupCheck.java    |  7 ++
 .../iotdb/confignode/manager/load/LoadManager.java | 51 +++++++++---
 .../manager/load/balancer/RouteBalancer.java       | 14 +++-
 .../manager/load/balancer/router/LeaderRouter.java | 94 ++++++++++++++++++++++
 .../load/heartbeat/DataNodeHeartbeatCache.java     |  2 +
 .../manager/load/heartbeat/IRegionGroupCache.java  | 37 +++++++++
 .../manager/load/heartbeat/RegionGroupCache.java   | 47 +++++++++++
 .../thrift/impl/DataNodeRPCServiceImpl.java        | 15 ++--
 thrift/src/main/thrift/datanode.thrift             |  4 +-
 13 files changed, 307 insertions(+), 20 deletions(-)

diff --git a/confignode/src/assembly/resources/conf/iotdb-confignode.properties 
b/confignode/src/assembly/resources/conf/iotdb-confignode.properties
index 4753238c6a..0c343e7a15 100644
--- a/confignode/src/assembly/resources/conf/iotdb-confignode.properties
+++ b/confignode/src/assembly/resources/conf/iotdb-confignode.properties
@@ -273,3 +273,16 @@ target_config_nodes=0.0.0.0:22277
 # The heartbeat interval in milliseconds, default is 1000ms
 # Datatype: long
 # heartbeat_interval=1000
+
+
+####################
+### Routing policy
+####################
+
+
+# The routing policy of read/write requests
+# These routing policy are currently supported:
+# 1. leader(Default, routing to leader replica)
+# 2. greedy(Routing to replica with the lowest load, might cause read 
un-consistent)
+# Datatype: string
+# routing_policy=greedy
\ No newline at end of file
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/client/handlers/DataNodeHeartbeatHandler.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/client/handlers/DataNodeHeartbeatHandler.java
index 0da0022626..4dcb53cb3b 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/client/handlers/DataNodeHeartbeatHandler.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/client/handlers/DataNodeHeartbeatHandler.java
@@ -18,15 +18,20 @@
  */
 package org.apache.iotdb.confignode.client.handlers;
 
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
 import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
 import 
org.apache.iotdb.confignode.manager.load.heartbeat.DataNodeHeartbeatCache;
 import org.apache.iotdb.confignode.manager.load.heartbeat.HeartbeatPackage;
+import org.apache.iotdb.confignode.manager.load.heartbeat.IRegionGroupCache;
+import org.apache.iotdb.confignode.manager.load.heartbeat.RegionGroupCache;
 import org.apache.iotdb.mpp.rpc.thrift.THeartbeatResp;
 
 import org.apache.thrift.async.AsyncMethodCallback;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.Map;
+
 public class DataNodeHeartbeatHandler implements 
AsyncMethodCallback<THeartbeatResp> {
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(DataNodeHeartbeatHandler.class);
@@ -34,17 +39,35 @@ public class DataNodeHeartbeatHandler implements 
AsyncMethodCallback<THeartbeatR
   // Update DataNodeHeartbeatCache when success
   private final TDataNodeLocation dataNodeLocation;
   private final DataNodeHeartbeatCache dataNodeHeartbeatCache;
+  private final Map<TConsensusGroupId, IRegionGroupCache> regionGroupCacheMap;
 
   public DataNodeHeartbeatHandler(
-      TDataNodeLocation dataNodeLocation, DataNodeHeartbeatCache 
dataNodeHeartbeatCache) {
+      TDataNodeLocation dataNodeLocation,
+      DataNodeHeartbeatCache dataNodeHeartbeatCache,
+      Map<TConsensusGroupId, IRegionGroupCache> regionGroupCacheMap) {
     this.dataNodeLocation = dataNodeLocation;
     this.dataNodeHeartbeatCache = dataNodeHeartbeatCache;
+    this.regionGroupCacheMap = regionGroupCacheMap;
   }
 
   @Override
-  public void onComplete(THeartbeatResp tHeartbeatResp) {
+  public void onComplete(THeartbeatResp heartbeatResp) {
     dataNodeHeartbeatCache.cacheHeartBeat(
-        new HeartbeatPackage(tHeartbeatResp.getHeartbeatTimestamp(), 
System.currentTimeMillis()));
+        new HeartbeatPackage(heartbeatResp.getHeartbeatTimestamp(), 
System.currentTimeMillis()));
+
+    if (heartbeatResp.isSetJudgedLeaders()) {
+      heartbeatResp
+          .getJudgedLeaders()
+          .forEach(
+              (consensusGroupId, isLeader) -> {
+                if (isLeader) {
+                  regionGroupCacheMap
+                      .computeIfAbsent(consensusGroupId, empty -> new 
RegionGroupCache())
+                      .updateLeader(
+                          heartbeatResp.getHeartbeatTimestamp(), 
dataNodeLocation.getDataNodeId());
+                }
+              });
+    }
   }
 
   @Override
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java
index 1e02b95b8d..00489b91ce 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.confignode.conf;
 import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.commons.conf.IoTDBConstant;
+import org.apache.iotdb.confignode.manager.load.balancer.RouteBalancer;
 import org.apache.iotdb.consensus.ConsensusFactory;
 import org.apache.iotdb.rpc.RpcUtils;
 
@@ -145,6 +146,9 @@ public class ConfigNodeConfig {
   /** The heartbeat interval in milliseconds */
   private long heartbeatInterval = 1000;
 
+  /** The routing policy of read/write requests */
+  private String routingPolicy = RouteBalancer.greedyPolicy;
+
   ConfigNodeConfig() {
     // empty constructor
   }
@@ -441,4 +445,12 @@ public class ConfigNodeConfig {
   public void setHeartbeatInterval(long heartbeatInterval) {
     this.heartbeatInterval = heartbeatInterval;
   }
+
+  public String getRoutingPolicy() {
+    return routingPolicy;
+  }
+
+  public void setRoutingPolicy(String routingPolicy) {
+    this.routingPolicy = routingPolicy;
+  }
 }
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
index 29b1feb008..55f8235a16 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
@@ -227,6 +227,8 @@ public class ConfigNodeDescriptor {
               properties.getProperty(
                   "heartbeat_interval", 
String.valueOf(conf.getHeartbeatInterval()))));
 
+      conf.setRoutingPolicy(properties.getProperty("routing_policy", 
conf.getRoutingPolicy()));
+
       // commons
       commonDescriptor.loadCommonProps(properties);
       commonDescriptor.initCommonConfigDir(conf.getSystemDir());
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeStartupCheck.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeStartupCheck.java
index 1f6f769529..3d31c3ac4f 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeStartupCheck.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeStartupCheck.java
@@ -27,6 +27,7 @@ import 
org.apache.iotdb.commons.exception.ConfigurationException;
 import org.apache.iotdb.commons.exception.StartupException;
 import org.apache.iotdb.commons.utils.NodeUrlUtils;
 import org.apache.iotdb.confignode.client.SyncConfigNodeClientPool;
+import org.apache.iotdb.confignode.manager.load.balancer.RouteBalancer;
 import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterReq;
 import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterResp;
 import org.apache.iotdb.consensus.ConsensusFactory;
@@ -124,6 +125,12 @@ public class ConfigNodeStartupCheck {
           String.format(
               "%s or %s", ConsensusFactory.StandAloneConsensus, 
ConsensusFactory.RatisConsensus));
     }
+
+    if (!conf.getRoutingPolicy().equals(RouteBalancer.leaderPolicy)
+        && !conf.getRoutingPolicy().equals(RouteBalancer.greedyPolicy)) {
+      throw new ConfigurationException(
+          "routing_policy", conf.getRoutingPolicy(), "leader or greedy");
+    }
   }
 
   /**
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
index bd886256b6..bcfca8114a 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
@@ -48,6 +48,7 @@ import 
org.apache.iotdb.confignode.manager.load.balancer.RouteBalancer;
 import 
org.apache.iotdb.confignode.manager.load.heartbeat.ConfigNodeHeartbeatCache;
 import 
org.apache.iotdb.confignode.manager.load.heartbeat.DataNodeHeartbeatCache;
 import org.apache.iotdb.confignode.manager.load.heartbeat.IHeartbeatStatistic;
+import org.apache.iotdb.confignode.manager.load.heartbeat.IRegionGroupCache;
 import org.apache.iotdb.mpp.rpc.thrift.THeartbeatReq;
 
 import org.slf4j.Logger;
@@ -60,6 +61,7 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Future;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 
 /**
  * The LoadManager at ConfigNodeGroup-Leader is active. It proactively 
implements the cluster
@@ -73,11 +75,16 @@ public class LoadManager {
 
   private final long heartbeatInterval =
       ConfigNodeDescriptor.getInstance().getConf().getHeartbeatInterval();
+
+  /** Heartbeat sample cache */
   // Map<NodeId, IHeartbeatStatistic>
   private final Map<Integer, IHeartbeatStatistic> heartbeatCacheMap;
+  // Map<RegionId, RegionGroupCache>
+  private final Map<TConsensusGroupId, IRegionGroupCache> regionGroupCacheMap;
 
-  // Balancers
+  /** Balancers */
   private final RegionBalancer regionBalancer;
+
   private final PartitionBalancer partitionBalancer;
   private final RouteBalancer routeBalancer;
 
@@ -89,15 +96,18 @@ public class LoadManager {
   private final Object heartbeatMonitor = new Object();
 
   private Future<?> currentHeartbeatFuture;
-  private int balanceCount = 0;
+  private final AtomicInteger balanceCount;
 
   public LoadManager(IManager configManager) {
     this.configManager = configManager;
     this.heartbeatCacheMap = new ConcurrentHashMap<>();
+    this.regionGroupCacheMap = new ConcurrentHashMap<>();
 
     this.regionBalancer = new RegionBalancer(configManager);
     this.partitionBalancer = new PartitionBalancer(configManager);
     this.routeBalancer = new RouteBalancer(configManager);
+
+    this.balanceCount = new AtomicInteger(0);
   }
 
   /**
@@ -173,10 +183,26 @@ public class LoadManager {
     return result;
   }
 
+  /**
+   * Get the leadership of each RegionGroup
+   *
+   * @return Map<RegionGroupId, leader location>
+   */
+  public Map<TConsensusGroupId, Integer> getAllLeadership() {
+    Map<TConsensusGroupId, Integer> result = new ConcurrentHashMap<>();
+
+    regionGroupCacheMap.forEach(
+        (consensusGroupId, regionGroupCache) ->
+            result.put(consensusGroupId, 
regionGroupCache.getLeaderDataNodeId()));
+
+    return result;
+  }
+
   /** Start the heartbeat service */
   public void start() {
     LOGGER.debug("Start Heartbeat Service of LoadManager");
     synchronized (heartbeatMonitor) {
+      balanceCount.set(0);
       if (currentHeartbeatFuture == null) {
         currentHeartbeatFuture =
             ScheduledExecutorUtil.safelyScheduleWithFixedDelay(
@@ -208,18 +234,24 @@ public class LoadManager {
       // Send heartbeat requests to all the online ConfigNodes
       pingOnlineConfigNodes(getNodeManager().getOnlineConfigNodes());
       // Do load balancing
-      doLoadBalancing(balanceCount);
-      balanceCount += 1;
+      doLoadBalancing();
+      balanceCount.getAndIncrement();
     }
   }
 
   private THeartbeatReq genHeartbeatReq() {
-    return new THeartbeatReq(System.currentTimeMillis());
+    THeartbeatReq heartbeatReq = new THeartbeatReq();
+    heartbeatReq.setHeartbeatTimestamp(System.currentTimeMillis());
+    // We update RegionGroups' leadership in every 5s
+    heartbeatReq.setNeedJudgeLeader(balanceCount.get() % 5 == 0);
+    // We sample DataNode load in every 10s
+    heartbeatReq.setNeedSamplingLoad(balanceCount.get() % 10 == 0);
+    return heartbeatReq;
   }
 
-  private void doLoadBalancing(int balanceCount) {
-    if (balanceCount % 5 == 0) {
-      // We update nodes' load statistic in every 5s
+  private void doLoadBalancing() {
+    if (balanceCount.get() % 10 == 0) {
+      // We update nodes' load statistic in every 10s
       updateNodeLoadStatistic();
     }
   }
@@ -242,7 +274,8 @@ public class LoadManager {
               (DataNodeHeartbeatCache)
                   heartbeatCacheMap.computeIfAbsent(
                       dataNodeInfo.getLocation().getDataNodeId(),
-                      empty -> new DataNodeHeartbeatCache()));
+                      empty -> new DataNodeHeartbeatCache()),
+              regionGroupCacheMap);
       AsyncDataNodeClientPool.getInstance()
           .getDataNodeHeartBeat(
               dataNodeInfo.getLocation().getInternalEndPoint(), 
genHeartbeatReq(), handler);
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RouteBalancer.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RouteBalancer.java
index aebb25a43d..bfcb0996e3 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RouteBalancer.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RouteBalancer.java
@@ -20,9 +20,11 @@ package org.apache.iotdb.confignode.manager.load.balancer;
 
 import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
 import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
+import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
 import org.apache.iotdb.confignode.manager.IManager;
 import org.apache.iotdb.confignode.manager.load.LoadManager;
 import org.apache.iotdb.confignode.manager.load.balancer.router.IRouter;
+import org.apache.iotdb.confignode.manager.load.balancer.router.LeaderRouter;
 import 
org.apache.iotdb.confignode.manager.load.balancer.router.LoadScoreGreedyRouter;
 
 import java.util.List;
@@ -34,6 +36,9 @@ import java.util.Map;
  */
 public class RouteBalancer {
 
+  public static final String leaderPolicy = "leader";
+  public static final String greedyPolicy = "greedy";
+
   private final IManager configManager;
 
   public RouteBalancer(IManager configManager) {
@@ -46,8 +51,13 @@ public class RouteBalancer {
   }
 
   private IRouter genRouter() {
-    // TODO: The Router should be configurable
-    return new LoadScoreGreedyRouter(getLoadManager().getAllLoadScores());
+    String policy = 
ConfigNodeDescriptor.getInstance().getConf().getRoutingPolicy();
+    if (policy.equals(leaderPolicy)) {
+      return new LeaderRouter(
+          getLoadManager().getAllLeadership(), 
getLoadManager().getAllLoadScores());
+    } else {
+      return new LoadScoreGreedyRouter(getLoadManager().getAllLoadScores());
+    }
   }
 
   private LoadManager getLoadManager() {
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/LeaderRouter.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/LeaderRouter.java
new file mode 100644
index 0000000000..9cc8ef8212
--- /dev/null
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/LeaderRouter.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.confignode.manager.load.balancer.router;
+
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
+import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
+import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
+import org.apache.iotdb.tsfile.utils.Pair;
+
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.Vector;
+import java.util.concurrent.ConcurrentHashMap;
+
+/** The LeaderRouter always pick the leader Replica */
+public class LeaderRouter implements IRouter {
+
+  // Map<RegionGroupId, leader location>
+  private final Map<TConsensusGroupId, Integer> leaderMap;
+  // Map<DataNodeId, loadScore>
+  private final Map<Integer, Float> loadScoreMap;
+
+  public LeaderRouter(Map<TConsensusGroupId, Integer> leaderMap, Map<Integer, 
Float> loadScoreMap) {
+    this.leaderMap = leaderMap;
+    this.loadScoreMap = loadScoreMap;
+  }
+
+  @Override
+  public Map<TConsensusGroupId, TRegionReplicaSet> genRealTimeRoutingPolicy(
+      List<TRegionReplicaSet> replicaSets) {
+    Map<TConsensusGroupId, TRegionReplicaSet> result = new 
ConcurrentHashMap<>();
+
+    replicaSets.forEach(
+        replicaSet -> {
+          int leaderId = leaderMap.getOrDefault(replicaSet.getRegionId(), -1);
+          TRegionReplicaSet sortedReplicaSet = new TRegionReplicaSet();
+          sortedReplicaSet.setRegionId(replicaSet.getRegionId());
+
+          /* 1. Pick leader if leader exists */
+          if (leaderId != -1) {
+            for (TDataNodeLocation dataNodeLocation : 
replicaSet.getDataNodeLocations()) {
+              if (dataNodeLocation.getDataNodeId() == leaderId) {
+                sortedReplicaSet.addToDataNodeLocations(dataNodeLocation);
+              }
+            }
+          }
+
+          /* 2. Sort replicaSets by loadScore and pick the rest */
+          // List<Pair<loadScore, TDataNodeLocation>> for sorting
+          List<Pair<Double, TDataNodeLocation>> sortList = new Vector<>();
+          replicaSet
+              .getDataNodeLocations()
+              .forEach(
+                  dataNodeLocation -> {
+                    // The absenteeism of loadScoreMap means ConfigNode-leader 
doesn't receive any
+                    // heartbeat from that DataNode.
+                    // In this case we put a maximum loadScore into the 
sortList.
+                    sortList.add(
+                        new Pair<>(
+                            (double)
+                                loadScoreMap.computeIfAbsent(
+                                    dataNodeLocation.getDataNodeId(), empty -> 
Float.MAX_VALUE),
+                            dataNodeLocation));
+                  });
+          sortList.sort(Comparator.comparingDouble(Pair::getLeft));
+          for (Pair<Double, TDataNodeLocation> entry : sortList) {
+            if (entry.getRight().getDataNodeId() != leaderId) {
+              sortedReplicaSet.addToDataNodeLocations(entry.getRight());
+            }
+          }
+
+          result.put(sortedReplicaSet.getRegionId(), sortedReplicaSet);
+        });
+
+    return result;
+  }
+}
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/DataNodeHeartbeatCache.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/DataNodeHeartbeatCache.java
index 3ea8803705..e02023fd24 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/DataNodeHeartbeatCache.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/DataNodeHeartbeatCache.java
@@ -25,6 +25,8 @@ import java.util.LinkedList;
 /** DataNodeHeartbeatCache caches and maintains all the heartbeat data */
 public class DataNodeHeartbeatCache implements IHeartbeatStatistic {
 
+  // TODO: This class might be split into DataNodeCache and ConfigNodeCache
+
   // Cache heartbeat samples
   private static final int maximumWindowSize = 100;
   private final LinkedList<HeartbeatPackage> slidingWindow;
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/IRegionGroupCache.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/IRegionGroupCache.java
new file mode 100644
index 0000000000..7283665dc5
--- /dev/null
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/IRegionGroupCache.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.confignode.manager.load.heartbeat;
+
+public interface IRegionGroupCache {
+
+  /**
+   * Update RegionGroup's latest leader
+   *
+   * @param timestamp Judging timestamp
+   * @param dataNodeId Leader location
+   */
+  void updateLeader(long timestamp, int dataNodeId);
+
+  /**
+   * Get RegionGroup's latest leader
+   *
+   * @return The DataNodeId of latest leader
+   */
+  int getLeaderDataNodeId();
+}
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/RegionGroupCache.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/RegionGroupCache.java
new file mode 100644
index 0000000000..6ec938ccbc
--- /dev/null
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/RegionGroupCache.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.confignode.manager.load.heartbeat;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class RegionGroupCache implements IRegionGroupCache {
+
+  // TODO: This class might be split into SchemaRegionGroupCache and 
DataRegionGroupCache
+
+  private long timestamp;
+
+  private final AtomicInteger leaderDataNodeId;
+
+  public RegionGroupCache() {
+    this.leaderDataNodeId = new AtomicInteger(-1);
+  }
+
+  @Override
+  public synchronized void updateLeader(long timestamp, int dataNodeId) {
+    if (timestamp > this.timestamp) {
+      this.timestamp = timestamp;
+      this.leaderDataNodeId.set(dataNodeId);
+    }
+  }
+
+  @Override
+  public int getLeaderDataNodeId() {
+    return leaderDataNodeId.get();
+  }
+}
diff --git 
a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeRPCServiceImpl.java
 
b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeRPCServiceImpl.java
index 6ba2645e4c..c6a12c5c79 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeRPCServiceImpl.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeRPCServiceImpl.java
@@ -101,7 +101,6 @@ import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Random;
 import java.util.stream.Collectors;
 
 public class DataNodeRPCServiceImpl implements IDataNodeRPCService.Iface {
@@ -109,7 +108,6 @@ public class DataNodeRPCServiceImpl implements 
IDataNodeRPCService.Iface {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(DataNodeRPCServiceImpl.class);
   private final SchemaEngine schemaEngine = SchemaEngine.getInstance();
   private final StorageEngineV2 storageEngine = StorageEngineV2.getInstance();
-  private static final double loadBalanceThreshold = 0.1;
 
   public DataNodeRPCServiceImpl() {
     super();
@@ -317,10 +315,17 @@ public class DataNodeRPCServiceImpl implements 
IDataNodeRPCService.Iface {
 
   @Override
   public THeartbeatResp getDataNodeHeartBeat(THeartbeatReq req) throws 
TException {
-    THeartbeatResp resp = new THeartbeatResp(req.getHeartbeatTimestamp(), 
getJudgedLeaders());
-    Random whetherToGetMetric = new Random();
+    THeartbeatResp resp = new THeartbeatResp();
+    resp.setHeartbeatTimestamp(req.getHeartbeatTimestamp());
+
+    // Judging leader if necessary
+    if (req.isNeedJudgeLeader()) {
+      resp.setJudgedLeaders(getJudgedLeaders());
+    }
+
+    // Sampling load if necessary
     if 
(MetricConfigDescriptor.getInstance().getMetricConfig().getEnableMetric()
-        && whetherToGetMetric.nextDouble() < loadBalanceThreshold) {
+        && req.isNeedSamplingLoad()) {
       long cpuLoad =
           MetricsService.getInstance()
               .getMetricManager()
diff --git a/thrift/src/main/thrift/datanode.thrift 
b/thrift/src/main/thrift/datanode.thrift
index f8da9cfc3b..8b3cf0c419 100644
--- a/thrift/src/main/thrift/datanode.thrift
+++ b/thrift/src/main/thrift/datanode.thrift
@@ -161,11 +161,13 @@ struct TInvalidatePermissionCacheReq {
 
 struct THeartbeatReq {
   1: required i64 heartbeatTimestamp
+  2: required bool needJudgeLeader
+  3: required bool needSamplingLoad
 }
 
 struct THeartbeatResp {
   1: required i64 heartbeatTimestamp
-  2: required map<common.TConsensusGroupId, bool> judgedLeaders
+  2: optional map<common.TConsensusGroupId, bool> judgedLeaders
   3: optional i16 cpu
   4: optional i16 memory
 }

Reply via email to