This is an automated email from the ASF dual-hosted git repository.

qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 6a34de5006 [IOTDB-3054] Cluster heartbeat framework (#5873)
6a34de5006 is described below

commit 6a34de5006bf459bae0a34f5f473791ab3608f40
Author: YongzaoDan <[email protected]>
AuthorDate: Fri May 13 23:48:03 2022 +0800

    [IOTDB-3054] Cluster heartbeat framework (#5873)
---
 .../resources/conf/iotdb-confignode.properties     | 18 +++++++
 .../confignode/client/AsyncDataNodeClientPool.java | 17 +++++++
 .../client/SyncConfigNodeClientPool.java           |  2 +-
 .../client/handlers/HeartbeatHandler.java          | 49 +++++++++++++++++++
 .../iotdb/confignode/conf/ConfigNodeConf.java      | 24 +++++++++-
 .../confignode/conf/ConfigNodeDescriptor.java      | 10 ++++
 .../confignode/conf/ConfigNodeStartupCheck.java    |  2 +-
 .../request/write/ApplyConfigNodeReq.java          |  2 +-
 .../response/DataNodeConfigurationResp.java        |  2 +-
 .../iotdb/confignode/manager/ConfigManager.java    |  7 +++
 .../iotdb/confignode/manager/ConsensusManager.java |  4 +-
 .../apache/iotdb/confignode/manager/Manager.java   |  1 +
 .../iotdb/confignode/manager/PartitionManager.java |  1 +
 .../confignode/manager/{ => load}/LoadManager.java | 55 ++++++++++++++++++++--
 .../allocator/CopySetRegionAllocator.java          |  2 +-
 .../{ => load}/allocator/IRegionAllocator.java     |  2 +-
 .../{ => load}/balancer/RegionBalancer.java        |  2 +-
 .../balancer/SeriesPartitionSlotBalancer.java      |  2 +-
 .../manager/load/heartbeat/HeartbeatCache.java     | 51 ++++++++++++++++++++
 .../heartbeat/HeartbeatPackage.java}               | 21 ++++++++-
 .../manager/load/heartbeat/HeartbeatWindow.java    | 51 ++++++++++++++++++++
 .../heartbeat/IHeartbeatStatistic.java}            | 19 +++++++-
 .../iotdb/confignode/persistence/NodeInfo.java     |  2 +-
 .../thrift/ConfigNodeRPCServiceProcessor.java      |  2 +-
 .../consensus/request/ConfigRequestSerDeTest.java  |  2 +-
 .../apache/iotdb/commons/utils/NodeUrlUtils.java   |  2 +-
 .../commons/utils/ThriftConfigNodeSerDeUtils.java  |  2 +-
 .../iotdb/commons/utils/NodeUrlUtilsTest.java      |  2 +-
 .../utils/ThriftConfigNodeSerDeUtilsTest.java      |  2 +-
 .../apache/iotdb/db/client/ConfigNodeClient.java   |  2 +-
 .../java/org/apache/iotdb/db/service/DataNode.java |  2 +-
 .../service/thrift/impl/InternalServiceImpl.java   |  8 ++++
 thrift-commons/src/main/thrift/common.thrift       | 13 +++++
 .../src/main/thrift/confignode.thrift              | 13 ++---
 thrift/src/main/thrift/mpp.thrift                  |  6 +++
 35 files changed, 365 insertions(+), 37 deletions(-)

diff --git a/confignode/src/assembly/resources/conf/iotdb-confignode.properties 
b/confignode/src/assembly/resources/conf/iotdb-confignode.properties
index b0fa282e91..2ca585b2d1 100644
--- a/confignode/src/assembly/resources/conf/iotdb-confignode.properties
+++ b/confignode/src/assembly/resources/conf/iotdb-confignode.properties
@@ -46,6 +46,7 @@ consensus_port=22278
 # Datatype: String
 target_confignode=0.0.0.0:22277
 
+
 ####################
 ### Consensus protocol configuration
 ####################
@@ -61,6 +62,7 @@ target_confignode=0.0.0.0:22277
 # Datatype: String
 # 
data_node_consensus_protocol_class=org.apache.iotdb.consensus.ratis.RatisConsensus
 
+
 ####################
 ### PartitionSlot configuration
 ####################
@@ -196,6 +198,7 @@ target_confignode=0.0.0.0:22277
 # If its prefix is "/", then the path is absolute. Otherwise, it is relative.
 # consensus_dir=data/consensus
 
+
 # procedure wal dir
 # If this property is unset, system will save the data in the default relative 
path directory under the confignode folder(i.e., 
%CONFIGNODE_HOME%/data/consensus).
 # If it is absolute, system will save the data in exact location it points to.
@@ -207,14 +210,17 @@ target_confignode=0.0.0.0:22277
 # For Linux platform
 # If its prefix is "/", then the path is absolute. Otherwise, it is relative.
 # proc_wal_dir=data/proc
+
 ####################
 ### Procedure Configuration
 ####################
 
+
 # Default number of worker thread count
 # Datatype: int
 #procedure_core_worker_thread_size=4
 
+
 # Default time interval of completed procedure cleaner work in, time unit is 
second
 # Datatype: int
 #procedure_completed_clean_interval=30
@@ -224,3 +230,15 @@ target_confignode=0.0.0.0:22277
 # Datatype: int
 #procedure_completed_evict_ttl=800
 
+####################
+### Heartbeat configuration
+####################
+
+
+# The heartbeat interval in milliseconds, default is 3000ms
+# Datatype: long
+# heartbeat_interval=3000
+
+
+# This parameter only exists for a few days
+# enable_heartbeat=false
\ No newline at end of file
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/client/AsyncDataNodeClientPool.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/client/AsyncDataNodeClientPool.java
index b4333651c1..3e156986ae 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/client/AsyncDataNodeClientPool.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/client/AsyncDataNodeClientPool.java
@@ -19,9 +19,11 @@
 package org.apache.iotdb.confignode.client;
 
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.common.rpc.thrift.THeartbeatReq;
 import org.apache.iotdb.commons.client.IClientManager;
 import 
org.apache.iotdb.commons.client.async.AsyncDataNodeInternalServiceClient;
 import org.apache.iotdb.confignode.client.handlers.CreateRegionHandler;
+import org.apache.iotdb.confignode.client.handlers.HeartbeatHandler;
 import org.apache.iotdb.mpp.rpc.thrift.TCreateDataRegionReq;
 import org.apache.iotdb.mpp.rpc.thrift.TCreateSchemaRegionReq;
 
@@ -81,6 +83,21 @@ public class AsyncDataNodeClientPool {
     }
   }
 
+  /**
+   * Only used in LoadManager
+   *
+   * @param endPoint The specific DataNode
+   */
+  public void getHeartBeat(TEndPoint endPoint, THeartbeatReq req, 
HeartbeatHandler handler) {
+    AsyncDataNodeInternalServiceClient client;
+    try {
+      client = clientManager.borrowClient(endPoint);
+      client.getHeartBeat(req, handler);
+    } catch (Exception e) {
+      LOGGER.error("Asking DataNode: {}, for heartbeat failed", endPoint, e);
+    }
+  }
+
   /**
    * Always call this interface when a DataNode is restarted or removed
    *
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/client/SyncConfigNodeClientPool.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/client/SyncConfigNodeClientPool.java
index 329ce205d4..484687eacd 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/client/SyncConfigNodeClientPool.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/client/SyncConfigNodeClientPool.java
@@ -18,11 +18,11 @@
  */
 package org.apache.iotdb.confignode.client;
 
+import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.client.IClientManager;
 import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
-import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeLocation;
 import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterReq;
 import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterResp;
 import org.apache.iotdb.db.client.DataNodeClientPoolFactory;
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/client/handlers/HeartbeatHandler.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/client/handlers/HeartbeatHandler.java
new file mode 100644
index 0000000000..84b8f2ec71
--- /dev/null
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/client/handlers/HeartbeatHandler.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.confignode.client.handlers;
+
+import org.apache.iotdb.common.rpc.thrift.THeartbeatResp;
+import org.apache.iotdb.confignode.manager.load.heartbeat.HeartbeatCache;
+import org.apache.iotdb.confignode.manager.load.heartbeat.HeartbeatPackage;
+
+import org.apache.thrift.async.AsyncMethodCallback;
+
+public class HeartbeatHandler implements AsyncMethodCallback<THeartbeatResp> {
+
+  // Update HeartbeatCache when success
+  private final int dataNodeId;
+  private final HeartbeatCache heartbeatCache;
+
+  public HeartbeatHandler(int dataNodeId, HeartbeatCache heartbeatCache) {
+    this.dataNodeId = dataNodeId;
+    this.heartbeatCache = heartbeatCache;
+  }
+
+  @Override
+  public void onComplete(THeartbeatResp tHeartbeatResp) {
+    heartbeatCache.cacheHeartBeat(
+        dataNodeId,
+        new HeartbeatPackage(tHeartbeatResp.getHeartbeatTimestamp(), 
System.currentTimeMillis()));
+  }
+
+  @Override
+  public void onError(Exception e) {
+    // Just ignore heartbeat error
+  }
+}
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConf.java 
b/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConf.java
index 96c481b027..923fe5c700 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConf.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConf.java
@@ -18,9 +18,9 @@
  */
 package org.apache.iotdb.confignode.conf;
 
+import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.commons.conf.IoTDBConstant;
-import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeLocation;
 import org.apache.iotdb.rpc.RpcUtils;
 
 import java.io.File;
@@ -128,6 +128,12 @@ public class ConfigNodeConf {
   private int procedureCoreWorkerThreadsSize =
       Math.max(Runtime.getRuntime().availableProcessors() / 4, 16);
 
+  /** The heartbeat interval in milliseconds */
+  private long heartbeatInterval = 3000;
+
+  /** This parameter only exists for a few days */
+  private boolean enableHeartbeat = false;
+
   ConfigNodeConf() {
     // empty constructor
   }
@@ -381,4 +387,20 @@ public class ConfigNodeConf {
   public void setProcedureCoreWorkerThreadsSize(int 
procedureCoreWorkerThreadsSize) {
     this.procedureCoreWorkerThreadsSize = procedureCoreWorkerThreadsSize;
   }
+
+  public long getHeartbeatInterval() {
+    return heartbeatInterval;
+  }
+
+  public void setHeartbeatInterval(long heartbeatInterval) {
+    this.heartbeatInterval = heartbeatInterval;
+  }
+
+  public boolean isEnableHeartbeat() {
+    return enableHeartbeat;
+  }
+
+  public void setEnableHeartbeat(boolean enableHeartbeat) {
+    this.enableHeartbeat = enableHeartbeat;
+  }
 }
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
index b1c4e49328..240021ae40 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
@@ -202,6 +202,16 @@ public class ConfigNodeDescriptor {
               properties.getProperty(
                   "maximum_data_region_count", 
String.valueOf(conf.getMaximumDataRegionCount()))));
 
+      conf.setHeartbeatInterval(
+          Long.parseLong(
+              properties.getProperty(
+                  "heartbeat_interval", 
String.valueOf(conf.getHeartbeatInterval()))));
+
+      conf.setEnableHeartbeat(
+          Boolean.parseBoolean(
+              properties.getProperty(
+                  "enable_heartbeat", 
String.valueOf(conf.isEnableHeartbeat()))));
+
       // commons
       commonDescriptor.loadCommonProps(properties);
       commonDescriptor.initCommonConfigDir(conf.getSystemDir());
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeStartupCheck.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeStartupCheck.java
index ca4713adf3..7480d78e0c 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeStartupCheck.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeStartupCheck.java
@@ -18,6 +18,7 @@
  */
 package org.apache.iotdb.confignode.conf;
 
+import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.commons.conf.CommonDescriptor;
 import org.apache.iotdb.commons.exception.BadNodeUrlException;
@@ -25,7 +26,6 @@ import 
org.apache.iotdb.commons.exception.ConfigurationException;
 import org.apache.iotdb.commons.exception.StartupException;
 import org.apache.iotdb.commons.utils.NodeUrlUtils;
 import org.apache.iotdb.confignode.client.SyncConfigNodeClientPool;
-import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeLocation;
 import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterReq;
 import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterResp;
 import org.apache.iotdb.rpc.TSStatusCode;
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/ApplyConfigNodeReq.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/ApplyConfigNodeReq.java
index 0a00beff6e..3eeceae8b7 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/ApplyConfigNodeReq.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/ApplyConfigNodeReq.java
@@ -18,10 +18,10 @@
  */
 package org.apache.iotdb.confignode.consensus.request.write;
 
+import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
 import org.apache.iotdb.commons.utils.ThriftConfigNodeSerDeUtils;
 import org.apache.iotdb.confignode.consensus.request.ConfigRequest;
 import org.apache.iotdb.confignode.consensus.request.ConfigRequestType;
-import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeLocation;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/DataNodeConfigurationResp.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/DataNodeConfigurationResp.java
index f21371b465..c4672cf4b0 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/DataNodeConfigurationResp.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/DataNodeConfigurationResp.java
@@ -18,8 +18,8 @@
  */
 package org.apache.iotdb.confignode.consensus.response;
 
+import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
-import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeLocation;
 import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRegisterResp;
 import org.apache.iotdb.confignode.rpc.thrift.TGlobalConfig;
 import org.apache.iotdb.consensus.common.DataSet;
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
index 47ebb67034..278859b9c4 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
@@ -48,6 +48,7 @@ import 
org.apache.iotdb.confignode.consensus.response.DataPartitionResp;
 import org.apache.iotdb.confignode.consensus.response.PermissionInfoResp;
 import org.apache.iotdb.confignode.consensus.response.SchemaPartitionResp;
 import org.apache.iotdb.confignode.consensus.response.StorageGroupSchemaResp;
+import org.apache.iotdb.confignode.manager.load.LoadManager;
 import org.apache.iotdb.confignode.persistence.ClusterSchemaInfo;
 import org.apache.iotdb.confignode.persistence.NodeInfo;
 import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterReq;
@@ -101,6 +102,12 @@ public class ConfigManager implements Manager {
     this.loadManager = new LoadManager(this);
     this.procedureManager = new ProcedureManager(this);
     this.consensusManager = new ConsensusManager(this);
+
+    // We are on testing.......
+    if (ConfigNodeDescriptor.getInstance().getConf().isEnableHeartbeat()) {
+      // Start asking for heartbeat
+      new Thread(this.loadManager).start();
+    }
   }
 
   public void close() throws IOException {
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConsensusManager.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConsensusManager.java
index 8d093e326c..80064c33fc 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConsensusManager.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConsensusManager.java
@@ -18,6 +18,7 @@
  */
 package org.apache.iotdb.confignode.manager;
 
+import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.consensus.ConsensusGroupId;
@@ -28,7 +29,6 @@ import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
 import org.apache.iotdb.confignode.consensus.request.ConfigRequest;
 import org.apache.iotdb.confignode.consensus.request.write.ApplyConfigNodeReq;
 import 
org.apache.iotdb.confignode.consensus.statemachine.PartitionRegionStateMachine;
-import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeLocation;
 import org.apache.iotdb.consensus.ConsensusFactory;
 import org.apache.iotdb.consensus.IConsensus;
 import org.apache.iotdb.consensus.common.Peer;
@@ -49,7 +49,7 @@ public class ConsensusManager {
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(ConsensusManager.class);
   private static final ConfigNodeConf conf = 
ConfigNodeDescriptor.getInstance().getConf();
-  private ConfigManager configManager;
+  private final ConfigManager configManager;
   private ConsensusGroupId consensusGroupId;
   private IConsensus consensusImpl;
 
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/Manager.java 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/Manager.java
index 32020b0575..6a7fc0a35f 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/Manager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/Manager.java
@@ -32,6 +32,7 @@ import 
org.apache.iotdb.confignode.consensus.request.write.SetSchemaReplicationF
 import org.apache.iotdb.confignode.consensus.request.write.SetStorageGroupReq;
 import org.apache.iotdb.confignode.consensus.request.write.SetTTLReq;
 import 
org.apache.iotdb.confignode.consensus.request.write.SetTimePartitionIntervalReq;
+import org.apache.iotdb.confignode.manager.load.LoadManager;
 import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterReq;
 import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterResp;
 import org.apache.iotdb.consensus.common.DataSet;
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/PartitionManager.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/PartitionManager.java
index e9ec247007..f0db7b0d05 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/PartitionManager.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/PartitionManager.java
@@ -37,6 +37,7 @@ import 
org.apache.iotdb.confignode.consensus.request.write.DeleteRegionsReq;
 import org.apache.iotdb.confignode.consensus.response.DataPartitionResp;
 import org.apache.iotdb.confignode.consensus.response.SchemaPartitionResp;
 import org.apache.iotdb.confignode.exception.NotEnoughDataNodeException;
+import org.apache.iotdb.confignode.manager.load.LoadManager;
 import org.apache.iotdb.confignode.persistence.PartitionInfo;
 import org.apache.iotdb.consensus.common.DataSet;
 import org.apache.iotdb.consensus.common.response.ConsensusReadResponse;
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/LoadManager.java 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
similarity index 81%
rename from 
confignode/src/main/java/org/apache/iotdb/confignode/manager/LoadManager.java
rename to 
confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
index 67aa1e7ffb..cae71a74e8 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/LoadManager.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
@@ -16,20 +16,29 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.confignode.manager;
+package org.apache.iotdb.confignode.manager.load;
 
 import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
 import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
 import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.common.rpc.thrift.THeartbeatReq;
 import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
 import org.apache.iotdb.commons.exception.MetadataException;
 import org.apache.iotdb.confignode.client.AsyncDataNodeClientPool;
 import org.apache.iotdb.confignode.client.handlers.CreateRegionHandler;
+import org.apache.iotdb.confignode.client.handlers.HeartbeatHandler;
+import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
 import org.apache.iotdb.confignode.consensus.request.write.CreateRegionsReq;
 import org.apache.iotdb.confignode.exception.NotEnoughDataNodeException;
-import org.apache.iotdb.confignode.manager.allocator.CopySetRegionAllocator;
-import org.apache.iotdb.confignode.manager.allocator.IRegionAllocator;
+import org.apache.iotdb.confignode.manager.ClusterSchemaManager;
+import org.apache.iotdb.confignode.manager.ConsensusManager;
+import org.apache.iotdb.confignode.manager.Manager;
+import org.apache.iotdb.confignode.manager.NodeManager;
+import org.apache.iotdb.confignode.manager.PartitionManager;
+import 
org.apache.iotdb.confignode.manager.load.allocator.CopySetRegionAllocator;
+import org.apache.iotdb.confignode.manager.load.allocator.IRegionAllocator;
+import org.apache.iotdb.confignode.manager.load.heartbeat.HeartbeatCache;
 import org.apache.iotdb.confignode.rpc.thrift.TStorageGroupSchema;
 import org.apache.iotdb.mpp.rpc.thrift.TCreateDataRegionReq;
 import org.apache.iotdb.mpp.rpc.thrift.TCreateSchemaRegionReq;
@@ -42,23 +51,30 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 
 /**
  * The LoadManager at ConfigNodeGroup-Leader is active. It proactively 
implements the cluster
  * dynamic load balancing policy and passively accepts the PartitionTable 
expansion request.
  */
-public class LoadManager {
+public class LoadManager implements Runnable {
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(LoadManager.class);
 
+  private final long heartbeatInterval =
+      ConfigNodeDescriptor.getInstance().getConf().getHeartbeatInterval();
+
   private final Manager configManager;
 
+  private final HeartbeatCache heartbeatCache;
+
   private final IRegionAllocator regionAllocator;
 
   // TODO: Interfaces for active, interrupt and reset LoadBalancer
 
   public LoadManager(Manager configManager) {
     this.configManager = configManager;
+    this.heartbeatCache = new HeartbeatCache();
     this.regionAllocator = new CopySetRegionAllocator();
   }
 
@@ -237,4 +253,35 @@ public class LoadManager {
   private PartitionManager getPartitionManager() {
     return configManager.getPartitionManager();
   }
+
+  private THeartbeatReq genHeartbeatReq() {
+    return new THeartbeatReq(System.currentTimeMillis());
+  }
+
+  @Override
+  public void run() {
+    while (true) {
+      try {
+
+        if (getConsensusManager().isLeader()) {
+          // Ask DataNode for heartbeat in every heartbeat interval
+          List<TDataNodeLocation> onlineDataNodes = 
getNodeManager().getOnlineDataNodes();
+          for (TDataNodeLocation dataNodeLocation : onlineDataNodes) {
+            HeartbeatHandler handler =
+                new HeartbeatHandler(dataNodeLocation.getDataNodeId(), 
heartbeatCache);
+            AsyncDataNodeClientPool.getInstance()
+                .getHeartBeat(dataNodeLocation.getInternalEndPoint(), 
genHeartbeatReq(), handler);
+          }
+
+        } else {
+          heartbeatCache.discardAllCache();
+        }
+
+        TimeUnit.MILLISECONDS.sleep(heartbeatInterval);
+      } catch (InterruptedException e) {
+        LOGGER.error("Heartbeat thread has been interrupted, stopping 
ConfigNode...", e);
+        System.exit(-1);
+      }
+    }
+  }
 }
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/allocator/CopySetRegionAllocator.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/allocator/CopySetRegionAllocator.java
similarity index 98%
rename from 
confignode/src/main/java/org/apache/iotdb/confignode/manager/allocator/CopySetRegionAllocator.java
rename to 
confignode/src/main/java/org/apache/iotdb/confignode/manager/load/allocator/CopySetRegionAllocator.java
index 00da9dc05f..7cdc8adf15 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/allocator/CopySetRegionAllocator.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/allocator/CopySetRegionAllocator.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.confignode.manager.allocator;
+package org.apache.iotdb.confignode.manager.load.allocator;
 
 import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
 import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/allocator/IRegionAllocator.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/allocator/IRegionAllocator.java
similarity index 96%
rename from 
confignode/src/main/java/org/apache/iotdb/confignode/manager/allocator/IRegionAllocator.java
rename to 
confignode/src/main/java/org/apache/iotdb/confignode/manager/load/allocator/IRegionAllocator.java
index 4874f5a18c..1466fd5d56 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/allocator/IRegionAllocator.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/allocator/IRegionAllocator.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.confignode.manager.allocator;
+package org.apache.iotdb.confignode.manager.load.allocator;
 
 import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
 import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/balancer/RegionBalancer.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RegionBalancer.java
similarity index 93%
rename from 
confignode/src/main/java/org/apache/iotdb/confignode/manager/balancer/RegionBalancer.java
rename to 
confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RegionBalancer.java
index bf7e24527b..2e099883d2 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/balancer/RegionBalancer.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RegionBalancer.java
@@ -16,6 +16,6 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.confignode.manager.balancer;
+package org.apache.iotdb.confignode.manager.load.balancer;
 
 public class RegionBalancer {}
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/balancer/SeriesPartitionSlotBalancer.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/SeriesPartitionSlotBalancer.java
similarity index 93%
copy from 
confignode/src/main/java/org/apache/iotdb/confignode/manager/balancer/SeriesPartitionSlotBalancer.java
copy to 
confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/SeriesPartitionSlotBalancer.java
index 441f732264..99225af381 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/balancer/SeriesPartitionSlotBalancer.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/SeriesPartitionSlotBalancer.java
@@ -16,6 +16,6 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.confignode.manager.balancer;
+package org.apache.iotdb.confignode.manager.load.balancer;
 
 public class SeriesPartitionSlotBalancer {}
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/HeartbeatCache.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/HeartbeatCache.java
new file mode 100644
index 0000000000..29a29ff4f8
--- /dev/null
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/HeartbeatCache.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.confignode.manager.load.heartbeat;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/** HeartbeatCache caches and maintains all the heartbeat data */
+public class HeartbeatCache implements IHeartbeatStatistic {
+
+  private boolean containsCache = false;
+
+  // Map<DataNodeId, HeartbeatWindow>
+  private final Map<Integer, HeartbeatWindow> windowMap;
+
+  public HeartbeatCache() {
+    this.windowMap = new HashMap<>();
+  }
+
+  @Override
+  public void cacheHeartBeat(int dataNodeId, HeartbeatPackage newHeartbeat) {
+    containsCache = true;
+    windowMap
+        .computeIfAbsent(dataNodeId, window -> new HeartbeatWindow())
+        .addHeartbeat(newHeartbeat);
+  }
+
+  @Override
+  public void discardAllCache() {
+    if (containsCache) {
+      containsCache = false;
+      windowMap.clear();
+    }
+  }
+}
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/balancer/SeriesPartitionSlotBalancer.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/HeartbeatPackage.java
similarity index 63%
copy from 
confignode/src/main/java/org/apache/iotdb/confignode/manager/balancer/SeriesPartitionSlotBalancer.java
copy to 
confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/HeartbeatPackage.java
index 441f732264..b7bbcb9bbb 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/balancer/SeriesPartitionSlotBalancer.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/HeartbeatPackage.java
@@ -16,6 +16,23 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.confignode.manager.balancer;
+package org.apache.iotdb.confignode.manager.load.heartbeat;
 
-public class SeriesPartitionSlotBalancer {}
+public class HeartbeatPackage {
+
+  private final long sendTimestamp;
+  private final long receiveTimestamp;
+
+  public HeartbeatPackage(long sendTimestamp, long receiveTimestamp) {
+    this.sendTimestamp = sendTimestamp;
+    this.receiveTimestamp = receiveTimestamp;
+  }
+
+  public long getSendTimestamp() {
+    return sendTimestamp;
+  }
+
+  public long getReceiveTimestamp() {
+    return receiveTimestamp;
+  }
+}
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/HeartbeatWindow.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/HeartbeatWindow.java
new file mode 100644
index 0000000000..6478b10a1a
--- /dev/null
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/HeartbeatWindow.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.confignode.manager.load.heartbeat;
+
+import java.util.LinkedList;
+
+/**
+ * HeartbeatWindow contains Heartbeat's sending and receiving time, which is 
used for estimating
+ * when the next heartbeat will arrive.
+ */
+public class HeartbeatWindow {
+
+  private static final int maximumWindowSize = 1000;
+
+  private final LinkedList<HeartbeatPackage> slidingWindow;
+
+  public HeartbeatWindow() {
+    this.slidingWindow = new LinkedList<>();
+  }
+
+  public void addHeartbeat(HeartbeatPackage newHeartbeat) {
+    synchronized (slidingWindow) {
+      // Only sequential heartbeats are accepted.
+      // And un-sequential heartbeats will be discarded.
+      if (slidingWindow.size() == 0
+          || slidingWindow.getLast().getSendTimestamp() < 
newHeartbeat.getSendTimestamp()) {
+        slidingWindow.add(newHeartbeat);
+      }
+
+      while (slidingWindow.size() > maximumWindowSize) {
+        slidingWindow.removeFirst();
+      }
+    }
+  }
+}
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/balancer/SeriesPartitionSlotBalancer.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/IHeartbeatStatistic.java
similarity index 59%
rename from 
confignode/src/main/java/org/apache/iotdb/confignode/manager/balancer/SeriesPartitionSlotBalancer.java
rename to 
confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/IHeartbeatStatistic.java
index 441f732264..9cfdb0890d 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/balancer/SeriesPartitionSlotBalancer.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/IHeartbeatStatistic.java
@@ -16,6 +16,21 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.confignode.manager.balancer;
+package org.apache.iotdb.confignode.manager.load.heartbeat;
 
-public class SeriesPartitionSlotBalancer {}
+/** All the interfaces that provided by HeartbeatCache */
+public interface IHeartbeatStatistic {
+
+  /**
+   * Cache the newest HeartbeatData of the specific DataNode
+   *
+   * @param dataNodeId The specific DataNodeId
+   * @param newHeartbeat The newest HeartbeatData
+   */
+  void cacheHeartBeat(int dataNodeId, HeartbeatPackage newHeartbeat);
+
+  // TODO: Interfaces for statistics
+
+  /** Only use this interface when current ConfigNode is not the leader */
+  void discardAllCache();
+}
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/NodeInfo.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/NodeInfo.java
index 38af6e207f..308090848c 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/NodeInfo.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/NodeInfo.java
@@ -18,6 +18,7 @@
  */
 package org.apache.iotdb.confignode.persistence;
 
+import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
 import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.utils.NodeUrlUtils;
@@ -28,7 +29,6 @@ import 
org.apache.iotdb.confignode.consensus.request.read.GetDataNodeInfoReq;
 import org.apache.iotdb.confignode.consensus.request.write.ApplyConfigNodeReq;
 import org.apache.iotdb.confignode.consensus.request.write.RegisterDataNodeReq;
 import org.apache.iotdb.confignode.consensus.response.DataNodeLocationsResp;
-import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeLocation;
 import org.apache.iotdb.rpc.TSStatusCode;
 
 import org.slf4j.Logger;
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
index 2abff56aaa..08795217ac 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
@@ -18,6 +18,7 @@
  */
 package org.apache.iotdb.confignode.service.thrift;
 
+import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.auth.AuthException;
 import org.apache.iotdb.commons.conf.CommonDescriptor;
@@ -49,7 +50,6 @@ import org.apache.iotdb.confignode.rpc.thrift.ConfigIService;
 import org.apache.iotdb.confignode.rpc.thrift.TAuthorizerReq;
 import org.apache.iotdb.confignode.rpc.thrift.TAuthorizerResp;
 import org.apache.iotdb.confignode.rpc.thrift.TCheckUserPrivilegesReq;
-import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeLocation;
 import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterReq;
 import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterResp;
 import org.apache.iotdb.confignode.rpc.thrift.TCountStorageGroupResp;
diff --git 
a/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigRequestSerDeTest.java
 
b/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigRequestSerDeTest.java
index 9f6e10a14c..89f9cdfefa 100644
--- 
a/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigRequestSerDeTest.java
+++ 
b/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigRequestSerDeTest.java
@@ -18,6 +18,7 @@
  */
 package org.apache.iotdb.confignode.consensus.request;
 
+import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
 import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
 import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
 import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
@@ -50,7 +51,6 @@ import 
org.apache.iotdb.confignode.consensus.request.write.SetTTLReq;
 import 
org.apache.iotdb.confignode.consensus.request.write.SetTimePartitionIntervalReq;
 import org.apache.iotdb.confignode.consensus.request.write.UpdateProcedureReq;
 import org.apache.iotdb.confignode.procedure.DeleteStorageGroupProcedure;
-import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeLocation;
 import org.apache.iotdb.confignode.rpc.thrift.TStorageGroupSchema;
 import org.apache.iotdb.procedure.Procedure;
 
diff --git 
a/node-commons/src/main/java/org/apache/iotdb/commons/utils/NodeUrlUtils.java 
b/node-commons/src/main/java/org/apache/iotdb/commons/utils/NodeUrlUtils.java
index ed76fabcc3..a502e4d42e 100644
--- 
a/node-commons/src/main/java/org/apache/iotdb/commons/utils/NodeUrlUtils.java
+++ 
b/node-commons/src/main/java/org/apache/iotdb/commons/utils/NodeUrlUtils.java
@@ -19,9 +19,9 @@
 
 package org.apache.iotdb.commons.utils;
 
+import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.commons.exception.BadNodeUrlException;
-import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeLocation;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
diff --git 
a/node-commons/src/main/java/org/apache/iotdb/commons/utils/ThriftConfigNodeSerDeUtils.java
 
b/node-commons/src/main/java/org/apache/iotdb/commons/utils/ThriftConfigNodeSerDeUtils.java
index ce05737608..2b7bef2986 100644
--- 
a/node-commons/src/main/java/org/apache/iotdb/commons/utils/ThriftConfigNodeSerDeUtils.java
+++ 
b/node-commons/src/main/java/org/apache/iotdb/commons/utils/ThriftConfigNodeSerDeUtils.java
@@ -18,8 +18,8 @@
  */
 package org.apache.iotdb.commons.utils;
 
+import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
 import org.apache.iotdb.commons.exception.runtime.ThriftSerDeException;
-import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeLocation;
 import org.apache.iotdb.confignode.rpc.thrift.TStorageGroupSchema;
 
 import org.apache.thrift.TException;
diff --git 
a/node-commons/src/test/java/org/apache/iotdb/commons/utils/NodeUrlUtilsTest.java
 
b/node-commons/src/test/java/org/apache/iotdb/commons/utils/NodeUrlUtilsTest.java
index 3c7667eea4..f6e497f309 100644
--- 
a/node-commons/src/test/java/org/apache/iotdb/commons/utils/NodeUrlUtilsTest.java
+++ 
b/node-commons/src/test/java/org/apache/iotdb/commons/utils/NodeUrlUtilsTest.java
@@ -18,9 +18,9 @@
  */
 package org.apache.iotdb.commons.utils;
 
+import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.commons.exception.BadNodeUrlException;
-import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeLocation;
 
 import org.junit.Assert;
 import org.junit.Test;
diff --git 
a/node-commons/src/test/java/org/apache/iotdb/commons/utils/ThriftConfigNodeSerDeUtilsTest.java
 
b/node-commons/src/test/java/org/apache/iotdb/commons/utils/ThriftConfigNodeSerDeUtilsTest.java
index 1f150fd2fd..60e0adef7b 100644
--- 
a/node-commons/src/test/java/org/apache/iotdb/commons/utils/ThriftConfigNodeSerDeUtilsTest.java
+++ 
b/node-commons/src/test/java/org/apache/iotdb/commons/utils/ThriftConfigNodeSerDeUtilsTest.java
@@ -18,10 +18,10 @@
  */
 package org.apache.iotdb.commons.utils;
 
+import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
 import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
 import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
-import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeLocation;
 import org.apache.iotdb.confignode.rpc.thrift.TStorageGroupSchema;
 
 import org.junit.After;
diff --git 
a/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java 
b/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java
index 50dd5f4741..ecff7dbcd3 100644
--- a/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java
+++ b/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java
@@ -19,13 +19,13 @@
 
 package org.apache.iotdb.db.client;
 
+import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.confignode.rpc.thrift.ConfigIService;
 import org.apache.iotdb.confignode.rpc.thrift.TAuthorizerReq;
 import org.apache.iotdb.confignode.rpc.thrift.TAuthorizerResp;
 import org.apache.iotdb.confignode.rpc.thrift.TCheckUserPrivilegesReq;
-import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeLocation;
 import org.apache.iotdb.confignode.rpc.thrift.TCountStorageGroupResp;
 import org.apache.iotdb.confignode.rpc.thrift.TDataNodeLocationResp;
 import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRegisterReq;
diff --git a/server/src/main/java/org/apache/iotdb/db/service/DataNode.java 
b/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
index 61b764e2eb..8875150162 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
@@ -18,6 +18,7 @@
  */
 package org.apache.iotdb.db.service;
 
+import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
 import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.commons.concurrent.IoTDBDefaultThreadExceptionHandler;
@@ -27,7 +28,6 @@ import org.apache.iotdb.commons.exception.StartupException;
 import org.apache.iotdb.commons.service.JMXService;
 import org.apache.iotdb.commons.service.RegisterManager;
 import org.apache.iotdb.commons.service.StartupChecks;
-import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeLocation;
 import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRegisterReq;
 import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRegisterResp;
 import org.apache.iotdb.db.client.ConfigNodeClient;
diff --git 
a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/InternalServiceImpl.java
 
b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/InternalServiceImpl.java
index b581cedfe1..1a4661d5b4 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/InternalServiceImpl.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/InternalServiceImpl.java
@@ -22,6 +22,8 @@ package org.apache.iotdb.db.service.thrift.impl;
 import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
 import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.common.rpc.thrift.THeartbeatReq;
+import org.apache.iotdb.common.rpc.thrift.THeartbeatResp;
 import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.consensus.ConsensusGroupId;
@@ -262,6 +264,12 @@ public class InternalServiceImpl implements 
InternalService.Iface {
     return null;
   }
 
+  @Override
+  public THeartbeatResp getHeartBeat(THeartbeatReq req) throws TException {
+    // TODO: Return load balancing messages
+    return new THeartbeatResp(req.getHeartbeatTimestamp());
+  }
+
   @Override
   public TSStatus deleteRegion(TConsensusGroupId tconsensusGroupId) throws 
TException {
     long queryIdRaw = SessionManager.getInstance().requestQueryId(false);
diff --git a/thrift-commons/src/main/thrift/common.thrift 
b/thrift-commons/src/main/thrift/common.thrift
index 9b09bc3743..8ffdf3ff40 100644
--- a/thrift-commons/src/main/thrift/common.thrift
+++ b/thrift-commons/src/main/thrift/common.thrift
@@ -57,6 +57,15 @@ struct TRegionReplicaSet {
   2: required list<TDataNodeLocation> dataNodeLocations
 }
 
+struct TConfigNodeLocation {
+  1: required TEndPoint internalEndPoint
+  2: required TEndPoint consensusEndPoint
+}
+
+struct THeartbeatReq {
+  1: required i64 heartbeatTimestamp
+}
+
 struct TDataNodeLocation {
   1: required i32 dataNodeId
   // TEndPoint for DataNode's external rpc
@@ -67,4 +76,8 @@ struct TDataNodeLocation {
   4: required TEndPoint dataBlockManagerEndPoint
   // TEndPoint for DataNode's ConsensusLayer
   5: required TEndPoint consensusEndPoint
+}
+
+struct THeartbeatResp {
+  1: required i64 heartbeatTimestamp
 }
\ No newline at end of file
diff --git a/thrift-confignode/src/main/thrift/confignode.thrift 
b/thrift-confignode/src/main/thrift/confignode.thrift
index 653d9ac2d7..7fa76108e8 100644
--- a/thrift-confignode/src/main/thrift/confignode.thrift
+++ b/thrift-confignode/src/main/thrift/confignode.thrift
@@ -38,7 +38,7 @@ struct TGlobalConfig {
 
 struct TDataNodeRegisterResp {
   1: required common.TSStatus status
-  2: required list<TConfigNodeLocation> configNodeList
+  2: required list<common.TConfigNodeLocation> configNodeList
   3: optional i32 dataNodeId
   4: optional TGlobalConfig globalConfig
 }
@@ -157,13 +157,8 @@ struct TCheckUserPrivilegesReq{
 }
 
 // ConfigNode
-struct TConfigNodeLocation {
-  1: required common.TEndPoint internalEndPoint
-  2: required common.TEndPoint consensusEndPoint
-}
-
 struct TConfigNodeRegisterReq {
-  1: required TConfigNodeLocation configNodeLocation
+  1: required common.TConfigNodeLocation configNodeLocation
   2: required string dataNodeConsensusProtocolClass
   3: required i32 seriesPartitionSlotNum
   4: required string seriesPartitionExecutorClass
@@ -176,7 +171,7 @@ struct TConfigNodeRegisterReq {
 struct TConfigNodeRegisterResp {
   1: required common.TSStatus status
   2: optional common.TConsensusGroupId partitionRegionId
-  3: optional list<TConfigNodeLocation> configNodeList
+  3: optional list<common.TConfigNodeLocation> configNodeList
 }
 
 service ConfigIService {
@@ -233,5 +228,5 @@ service ConfigIService {
 
   TConfigNodeRegisterResp registerConfigNode(TConfigNodeRegisterReq req)
 
-  common.TSStatus applyConfigNode(TConfigNodeLocation configNodeLocation)
+  common.TSStatus applyConfigNode(common.TConfigNodeLocation 
configNodeLocation)
 }
\ No newline at end of file
diff --git a/thrift/src/main/thrift/mpp.thrift 
b/thrift/src/main/thrift/mpp.thrift
index 51be9a80b3..dfc26e4da0 100644
--- a/thrift/src/main/thrift/mpp.thrift
+++ b/thrift/src/main/thrift/mpp.thrift
@@ -204,6 +204,12 @@ service InternalService {
    */
   common.TSStatus migrateDataRegion(TMigrateDataRegionReq req)
 
+  /**
+  * ConfigNode will ask DataNode for heartbeat in every few seconds.
+  *
+  * @param ConfigNode will send the latest config_node_list and load balancing 
policies in THeartbeatReq
+  **/
+  common.THeartbeatResp getHeartBeat(common.THeartbeatReq req)
 }
 
 service DataBlockService {

Reply via email to