This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 6a34de5006 [IOTDB-3054] Cluster heartbeat framework (#5873)
6a34de5006 is described below
commit 6a34de5006bf459bae0a34f5f473791ab3608f40
Author: YongzaoDan <[email protected]>
AuthorDate: Fri May 13 23:48:03 2022 +0800
[IOTDB-3054] Cluster heartbeat framework (#5873)
---
.../resources/conf/iotdb-confignode.properties | 18 +++++++
.../confignode/client/AsyncDataNodeClientPool.java | 17 +++++++
.../client/SyncConfigNodeClientPool.java | 2 +-
.../client/handlers/HeartbeatHandler.java | 49 +++++++++++++++++++
.../iotdb/confignode/conf/ConfigNodeConf.java | 24 +++++++++-
.../confignode/conf/ConfigNodeDescriptor.java | 10 ++++
.../confignode/conf/ConfigNodeStartupCheck.java | 2 +-
.../request/write/ApplyConfigNodeReq.java | 2 +-
.../response/DataNodeConfigurationResp.java | 2 +-
.../iotdb/confignode/manager/ConfigManager.java | 7 +++
.../iotdb/confignode/manager/ConsensusManager.java | 4 +-
.../apache/iotdb/confignode/manager/Manager.java | 1 +
.../iotdb/confignode/manager/PartitionManager.java | 1 +
.../confignode/manager/{ => load}/LoadManager.java | 55 ++++++++++++++++++++--
.../allocator/CopySetRegionAllocator.java | 2 +-
.../{ => load}/allocator/IRegionAllocator.java | 2 +-
.../{ => load}/balancer/RegionBalancer.java | 2 +-
.../balancer/SeriesPartitionSlotBalancer.java | 2 +-
.../manager/load/heartbeat/HeartbeatCache.java | 51 ++++++++++++++++++++
.../heartbeat/HeartbeatPackage.java} | 21 ++++++++-
.../manager/load/heartbeat/HeartbeatWindow.java | 51 ++++++++++++++++++++
.../heartbeat/IHeartbeatStatistic.java} | 19 +++++++-
.../iotdb/confignode/persistence/NodeInfo.java | 2 +-
.../thrift/ConfigNodeRPCServiceProcessor.java | 2 +-
.../consensus/request/ConfigRequestSerDeTest.java | 2 +-
.../apache/iotdb/commons/utils/NodeUrlUtils.java | 2 +-
.../commons/utils/ThriftConfigNodeSerDeUtils.java | 2 +-
.../iotdb/commons/utils/NodeUrlUtilsTest.java | 2 +-
.../utils/ThriftConfigNodeSerDeUtilsTest.java | 2 +-
.../apache/iotdb/db/client/ConfigNodeClient.java | 2 +-
.../java/org/apache/iotdb/db/service/DataNode.java | 2 +-
.../service/thrift/impl/InternalServiceImpl.java | 8 ++++
thrift-commons/src/main/thrift/common.thrift | 13 +++++
.../src/main/thrift/confignode.thrift | 13 ++---
thrift/src/main/thrift/mpp.thrift | 6 +++
35 files changed, 365 insertions(+), 37 deletions(-)
diff --git a/confignode/src/assembly/resources/conf/iotdb-confignode.properties
b/confignode/src/assembly/resources/conf/iotdb-confignode.properties
index b0fa282e91..2ca585b2d1 100644
--- a/confignode/src/assembly/resources/conf/iotdb-confignode.properties
+++ b/confignode/src/assembly/resources/conf/iotdb-confignode.properties
@@ -46,6 +46,7 @@ consensus_port=22278
# Datatype: String
target_confignode=0.0.0.0:22277
+
####################
### Consensus protocol configuration
####################
@@ -61,6 +62,7 @@ target_confignode=0.0.0.0:22277
# Datatype: String
#
data_node_consensus_protocol_class=org.apache.iotdb.consensus.ratis.RatisConsensus
+
####################
### PartitionSlot configuration
####################
@@ -196,6 +198,7 @@ target_confignode=0.0.0.0:22277
# If its prefix is "/", then the path is absolute. Otherwise, it is relative.
# consensus_dir=data/consensus
+
# procedure wal dir
# If this property is unset, system will save the data in the default relative
path directory under the confignode folder(i.e.,
%CONFIGNODE_HOME%/data/consensus).
# If it is absolute, system will save the data in exact location it points to.
@@ -207,14 +210,17 @@ target_confignode=0.0.0.0:22277
# For Linux platform
# If its prefix is "/", then the path is absolute. Otherwise, it is relative.
# proc_wal_dir=data/proc
+
####################
### Procedure Configuration
####################
+
# Default number of worker thread count
# Datatype: int
#procedure_core_worker_thread_size=4
+
# Default time interval of completed procedure cleaner work in, time unit is
second
# Datatype: int
#procedure_completed_clean_interval=30
@@ -224,3 +230,15 @@ target_confignode=0.0.0.0:22277
# Datatype: int
#procedure_completed_evict_ttl=800
+####################
+### Heartbeat configuration
+####################
+
+
+# The heartbeat interval in milliseconds, default is 3000ms
+# Datatype: long
+# heartbeat_interval=3000
+
+
+# This parameter only exists for a few days
+# enable_heartbeat=false
\ No newline at end of file
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/client/AsyncDataNodeClientPool.java
b/confignode/src/main/java/org/apache/iotdb/confignode/client/AsyncDataNodeClientPool.java
index b4333651c1..3e156986ae 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/client/AsyncDataNodeClientPool.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/client/AsyncDataNodeClientPool.java
@@ -19,9 +19,11 @@
package org.apache.iotdb.confignode.client;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.common.rpc.thrift.THeartbeatReq;
import org.apache.iotdb.commons.client.IClientManager;
import
org.apache.iotdb.commons.client.async.AsyncDataNodeInternalServiceClient;
import org.apache.iotdb.confignode.client.handlers.CreateRegionHandler;
+import org.apache.iotdb.confignode.client.handlers.HeartbeatHandler;
import org.apache.iotdb.mpp.rpc.thrift.TCreateDataRegionReq;
import org.apache.iotdb.mpp.rpc.thrift.TCreateSchemaRegionReq;
@@ -81,6 +83,21 @@ public class AsyncDataNodeClientPool {
}
}
+ /**
+ * Only used in LoadManager
+ *
+ * @param endPoint The specific DataNode
+ */
+ public void getHeartBeat(TEndPoint endPoint, THeartbeatReq req,
HeartbeatHandler handler) {
+ AsyncDataNodeInternalServiceClient client;
+ try {
+ client = clientManager.borrowClient(endPoint);
+ client.getHeartBeat(req, handler);
+ } catch (Exception e) {
+ LOGGER.error("Asking DataNode: {}, for heartbeat failed", endPoint, e);
+ }
+ }
+
/**
* Always call this interface when a DataNode is restarted or removed
*
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/client/SyncConfigNodeClientPool.java
b/confignode/src/main/java/org/apache/iotdb/confignode/client/SyncConfigNodeClientPool.java
index 329ce205d4..484687eacd 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/client/SyncConfigNodeClientPool.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/client/SyncConfigNodeClientPool.java
@@ -18,11 +18,11 @@
*/
package org.apache.iotdb.confignode.client;
+import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.client.IClientManager;
import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
-import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeLocation;
import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterReq;
import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterResp;
import org.apache.iotdb.db.client.DataNodeClientPoolFactory;
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/client/handlers/HeartbeatHandler.java
b/confignode/src/main/java/org/apache/iotdb/confignode/client/handlers/HeartbeatHandler.java
new file mode 100644
index 0000000000..84b8f2ec71
--- /dev/null
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/client/handlers/HeartbeatHandler.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.confignode.client.handlers;
+
+import org.apache.iotdb.common.rpc.thrift.THeartbeatResp;
+import org.apache.iotdb.confignode.manager.load.heartbeat.HeartbeatCache;
+import org.apache.iotdb.confignode.manager.load.heartbeat.HeartbeatPackage;
+
+import org.apache.thrift.async.AsyncMethodCallback;
+
+public class HeartbeatHandler implements AsyncMethodCallback<THeartbeatResp> {
+
+ // Update HeartbeatCache when success
+ private final int dataNodeId;
+ private final HeartbeatCache heartbeatCache;
+
+ public HeartbeatHandler(int dataNodeId, HeartbeatCache heartbeatCache) {
+ this.dataNodeId = dataNodeId;
+ this.heartbeatCache = heartbeatCache;
+ }
+
+ @Override
+ public void onComplete(THeartbeatResp tHeartbeatResp) {
+ heartbeatCache.cacheHeartBeat(
+ dataNodeId,
+ new HeartbeatPackage(tHeartbeatResp.getHeartbeatTimestamp(),
System.currentTimeMillis()));
+ }
+
+ @Override
+ public void onError(Exception e) {
+ // Just ignore heartbeat error
+ }
+}
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConf.java
b/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConf.java
index 96c481b027..923fe5c700 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConf.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConf.java
@@ -18,9 +18,9 @@
*/
package org.apache.iotdb.confignode.conf;
+import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.commons.conf.IoTDBConstant;
-import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeLocation;
import org.apache.iotdb.rpc.RpcUtils;
import java.io.File;
@@ -128,6 +128,12 @@ public class ConfigNodeConf {
private int procedureCoreWorkerThreadsSize =
Math.max(Runtime.getRuntime().availableProcessors() / 4, 16);
+ /** The heartbeat interval in milliseconds */
+ private long heartbeatInterval = 3000;
+
+ /** This parameter only exists for a few days */
+ private boolean enableHeartbeat = false;
+
ConfigNodeConf() {
// empty constructor
}
@@ -381,4 +387,20 @@ public class ConfigNodeConf {
public void setProcedureCoreWorkerThreadsSize(int
procedureCoreWorkerThreadsSize) {
this.procedureCoreWorkerThreadsSize = procedureCoreWorkerThreadsSize;
}
+
+ public long getHeartbeatInterval() {
+ return heartbeatInterval;
+ }
+
+ public void setHeartbeatInterval(long heartbeatInterval) {
+ this.heartbeatInterval = heartbeatInterval;
+ }
+
+ public boolean isEnableHeartbeat() {
+ return enableHeartbeat;
+ }
+
+ public void setEnableHeartbeat(boolean enableHeartbeat) {
+ this.enableHeartbeat = enableHeartbeat;
+ }
}
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
b/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
index b1c4e49328..240021ae40 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
@@ -202,6 +202,16 @@ public class ConfigNodeDescriptor {
properties.getProperty(
"maximum_data_region_count",
String.valueOf(conf.getMaximumDataRegionCount()))));
+ conf.setHeartbeatInterval(
+ Long.parseLong(
+ properties.getProperty(
+ "heartbeat_interval",
String.valueOf(conf.getHeartbeatInterval()))));
+
+ conf.setEnableHeartbeat(
+ Boolean.parseBoolean(
+ properties.getProperty(
+ "enable_heartbeat",
String.valueOf(conf.isEnableHeartbeat()))));
+
// commons
commonDescriptor.loadCommonProps(properties);
commonDescriptor.initCommonConfigDir(conf.getSystemDir());
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeStartupCheck.java
b/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeStartupCheck.java
index ca4713adf3..7480d78e0c 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeStartupCheck.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeStartupCheck.java
@@ -18,6 +18,7 @@
*/
package org.apache.iotdb.confignode.conf;
+import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.commons.conf.CommonDescriptor;
import org.apache.iotdb.commons.exception.BadNodeUrlException;
@@ -25,7 +26,6 @@ import
org.apache.iotdb.commons.exception.ConfigurationException;
import org.apache.iotdb.commons.exception.StartupException;
import org.apache.iotdb.commons.utils.NodeUrlUtils;
import org.apache.iotdb.confignode.client.SyncConfigNodeClientPool;
-import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeLocation;
import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterReq;
import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterResp;
import org.apache.iotdb.rpc.TSStatusCode;
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/ApplyConfigNodeReq.java
b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/ApplyConfigNodeReq.java
index 0a00beff6e..3eeceae8b7 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/ApplyConfigNodeReq.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/ApplyConfigNodeReq.java
@@ -18,10 +18,10 @@
*/
package org.apache.iotdb.confignode.consensus.request.write;
+import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
import org.apache.iotdb.commons.utils.ThriftConfigNodeSerDeUtils;
import org.apache.iotdb.confignode.consensus.request.ConfigRequest;
import org.apache.iotdb.confignode.consensus.request.ConfigRequestType;
-import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeLocation;
import java.io.IOException;
import java.nio.ByteBuffer;
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/DataNodeConfigurationResp.java
b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/DataNodeConfigurationResp.java
index f21371b465..c4672cf4b0 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/DataNodeConfigurationResp.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/DataNodeConfigurationResp.java
@@ -18,8 +18,8 @@
*/
package org.apache.iotdb.confignode.consensus.response;
+import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
-import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeLocation;
import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRegisterResp;
import org.apache.iotdb.confignode.rpc.thrift.TGlobalConfig;
import org.apache.iotdb.consensus.common.DataSet;
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
index 47ebb67034..278859b9c4 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
@@ -48,6 +48,7 @@ import
org.apache.iotdb.confignode.consensus.response.DataPartitionResp;
import org.apache.iotdb.confignode.consensus.response.PermissionInfoResp;
import org.apache.iotdb.confignode.consensus.response.SchemaPartitionResp;
import org.apache.iotdb.confignode.consensus.response.StorageGroupSchemaResp;
+import org.apache.iotdb.confignode.manager.load.LoadManager;
import org.apache.iotdb.confignode.persistence.ClusterSchemaInfo;
import org.apache.iotdb.confignode.persistence.NodeInfo;
import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterReq;
@@ -101,6 +102,12 @@ public class ConfigManager implements Manager {
this.loadManager = new LoadManager(this);
this.procedureManager = new ProcedureManager(this);
this.consensusManager = new ConsensusManager(this);
+
+ // We are on testing.......
+ if (ConfigNodeDescriptor.getInstance().getConf().isEnableHeartbeat()) {
+ // Start asking for heartbeat
+ new Thread(this.loadManager).start();
+ }
}
public void close() throws IOException {
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConsensusManager.java
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConsensusManager.java
index 8d093e326c..80064c33fc 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConsensusManager.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConsensusManager.java
@@ -18,6 +18,7 @@
*/
package org.apache.iotdb.confignode.manager;
+import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.consensus.ConsensusGroupId;
@@ -28,7 +29,6 @@ import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
import org.apache.iotdb.confignode.consensus.request.ConfigRequest;
import org.apache.iotdb.confignode.consensus.request.write.ApplyConfigNodeReq;
import
org.apache.iotdb.confignode.consensus.statemachine.PartitionRegionStateMachine;
-import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeLocation;
import org.apache.iotdb.consensus.ConsensusFactory;
import org.apache.iotdb.consensus.IConsensus;
import org.apache.iotdb.consensus.common.Peer;
@@ -49,7 +49,7 @@ public class ConsensusManager {
private static final Logger LOGGER =
LoggerFactory.getLogger(ConsensusManager.class);
private static final ConfigNodeConf conf =
ConfigNodeDescriptor.getInstance().getConf();
- private ConfigManager configManager;
+ private final ConfigManager configManager;
private ConsensusGroupId consensusGroupId;
private IConsensus consensusImpl;
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/Manager.java
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/Manager.java
index 32020b0575..6a7fc0a35f 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/Manager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/Manager.java
@@ -32,6 +32,7 @@ import
org.apache.iotdb.confignode.consensus.request.write.SetSchemaReplicationF
import org.apache.iotdb.confignode.consensus.request.write.SetStorageGroupReq;
import org.apache.iotdb.confignode.consensus.request.write.SetTTLReq;
import
org.apache.iotdb.confignode.consensus.request.write.SetTimePartitionIntervalReq;
+import org.apache.iotdb.confignode.manager.load.LoadManager;
import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterReq;
import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterResp;
import org.apache.iotdb.consensus.common.DataSet;
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/PartitionManager.java
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/PartitionManager.java
index e9ec247007..f0db7b0d05 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/PartitionManager.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/PartitionManager.java
@@ -37,6 +37,7 @@ import
org.apache.iotdb.confignode.consensus.request.write.DeleteRegionsReq;
import org.apache.iotdb.confignode.consensus.response.DataPartitionResp;
import org.apache.iotdb.confignode.consensus.response.SchemaPartitionResp;
import org.apache.iotdb.confignode.exception.NotEnoughDataNodeException;
+import org.apache.iotdb.confignode.manager.load.LoadManager;
import org.apache.iotdb.confignode.persistence.PartitionInfo;
import org.apache.iotdb.consensus.common.DataSet;
import org.apache.iotdb.consensus.common.response.ConsensusReadResponse;
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/LoadManager.java
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
similarity index 81%
rename from
confignode/src/main/java/org/apache/iotdb/confignode/manager/LoadManager.java
rename to
confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
index 67aa1e7ffb..cae71a74e8 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/LoadManager.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
@@ -16,20 +16,29 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.confignode.manager;
+package org.apache.iotdb.confignode.manager.load;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.common.rpc.thrift.THeartbeatReq;
import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
import org.apache.iotdb.commons.exception.MetadataException;
import org.apache.iotdb.confignode.client.AsyncDataNodeClientPool;
import org.apache.iotdb.confignode.client.handlers.CreateRegionHandler;
+import org.apache.iotdb.confignode.client.handlers.HeartbeatHandler;
+import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
import org.apache.iotdb.confignode.consensus.request.write.CreateRegionsReq;
import org.apache.iotdb.confignode.exception.NotEnoughDataNodeException;
-import org.apache.iotdb.confignode.manager.allocator.CopySetRegionAllocator;
-import org.apache.iotdb.confignode.manager.allocator.IRegionAllocator;
+import org.apache.iotdb.confignode.manager.ClusterSchemaManager;
+import org.apache.iotdb.confignode.manager.ConsensusManager;
+import org.apache.iotdb.confignode.manager.Manager;
+import org.apache.iotdb.confignode.manager.NodeManager;
+import org.apache.iotdb.confignode.manager.PartitionManager;
+import
org.apache.iotdb.confignode.manager.load.allocator.CopySetRegionAllocator;
+import org.apache.iotdb.confignode.manager.load.allocator.IRegionAllocator;
+import org.apache.iotdb.confignode.manager.load.heartbeat.HeartbeatCache;
import org.apache.iotdb.confignode.rpc.thrift.TStorageGroupSchema;
import org.apache.iotdb.mpp.rpc.thrift.TCreateDataRegionReq;
import org.apache.iotdb.mpp.rpc.thrift.TCreateSchemaRegionReq;
@@ -42,23 +51,30 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
/**
* The LoadManager at ConfigNodeGroup-Leader is active. It proactively
implements the cluster
* dynamic load balancing policy and passively accepts the PartitionTable
expansion request.
*/
-public class LoadManager {
+public class LoadManager implements Runnable {
private static final Logger LOGGER =
LoggerFactory.getLogger(LoadManager.class);
+ private final long heartbeatInterval =
+ ConfigNodeDescriptor.getInstance().getConf().getHeartbeatInterval();
+
private final Manager configManager;
+ private final HeartbeatCache heartbeatCache;
+
private final IRegionAllocator regionAllocator;
// TODO: Interfaces for active, interrupt and reset LoadBalancer
public LoadManager(Manager configManager) {
this.configManager = configManager;
+ this.heartbeatCache = new HeartbeatCache();
this.regionAllocator = new CopySetRegionAllocator();
}
@@ -237,4 +253,35 @@ public class LoadManager {
private PartitionManager getPartitionManager() {
return configManager.getPartitionManager();
}
+
+ private THeartbeatReq genHeartbeatReq() {
+ return new THeartbeatReq(System.currentTimeMillis());
+ }
+
+ @Override
+ public void run() {
+ while (true) {
+ try {
+
+ if (getConsensusManager().isLeader()) {
+ // Ask DataNode for heartbeat in every heartbeat interval
+ List<TDataNodeLocation> onlineDataNodes =
getNodeManager().getOnlineDataNodes();
+ for (TDataNodeLocation dataNodeLocation : onlineDataNodes) {
+ HeartbeatHandler handler =
+ new HeartbeatHandler(dataNodeLocation.getDataNodeId(),
heartbeatCache);
+ AsyncDataNodeClientPool.getInstance()
+ .getHeartBeat(dataNodeLocation.getInternalEndPoint(),
genHeartbeatReq(), handler);
+ }
+
+ } else {
+ heartbeatCache.discardAllCache();
+ }
+
+ TimeUnit.MILLISECONDS.sleep(heartbeatInterval);
+ } catch (InterruptedException e) {
+ LOGGER.error("Heartbeat thread has been interrupted, stopping
ConfigNode...", e);
+ System.exit(-1);
+ }
+ }
+ }
}
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/allocator/CopySetRegionAllocator.java
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/allocator/CopySetRegionAllocator.java
similarity index 98%
rename from
confignode/src/main/java/org/apache/iotdb/confignode/manager/allocator/CopySetRegionAllocator.java
rename to
confignode/src/main/java/org/apache/iotdb/confignode/manager/load/allocator/CopySetRegionAllocator.java
index 00da9dc05f..7cdc8adf15 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/allocator/CopySetRegionAllocator.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/allocator/CopySetRegionAllocator.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.confignode.manager.allocator;
+package org.apache.iotdb.confignode.manager.load.allocator;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/allocator/IRegionAllocator.java
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/allocator/IRegionAllocator.java
similarity index 96%
rename from
confignode/src/main/java/org/apache/iotdb/confignode/manager/allocator/IRegionAllocator.java
rename to
confignode/src/main/java/org/apache/iotdb/confignode/manager/load/allocator/IRegionAllocator.java
index 4874f5a18c..1466fd5d56 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/allocator/IRegionAllocator.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/allocator/IRegionAllocator.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.confignode.manager.allocator;
+package org.apache.iotdb.confignode.manager.load.allocator;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/balancer/RegionBalancer.java
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RegionBalancer.java
similarity index 93%
rename from
confignode/src/main/java/org/apache/iotdb/confignode/manager/balancer/RegionBalancer.java
rename to
confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RegionBalancer.java
index bf7e24527b..2e099883d2 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/balancer/RegionBalancer.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RegionBalancer.java
@@ -16,6 +16,6 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.confignode.manager.balancer;
+package org.apache.iotdb.confignode.manager.load.balancer;
public class RegionBalancer {}
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/balancer/SeriesPartitionSlotBalancer.java
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/SeriesPartitionSlotBalancer.java
similarity index 93%
copy from
confignode/src/main/java/org/apache/iotdb/confignode/manager/balancer/SeriesPartitionSlotBalancer.java
copy to
confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/SeriesPartitionSlotBalancer.java
index 441f732264..99225af381 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/balancer/SeriesPartitionSlotBalancer.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/SeriesPartitionSlotBalancer.java
@@ -16,6 +16,6 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.confignode.manager.balancer;
+package org.apache.iotdb.confignode.manager.load.balancer;
public class SeriesPartitionSlotBalancer {}
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/HeartbeatCache.java
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/HeartbeatCache.java
new file mode 100644
index 0000000000..29a29ff4f8
--- /dev/null
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/HeartbeatCache.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.confignode.manager.load.heartbeat;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/** HeartbeatCache caches and maintains all the heartbeat data */
+public class HeartbeatCache implements IHeartbeatStatistic {
+
+ private boolean containsCache = false;
+
+ // Map<DataNodeId, HeartbeatWindow>
+ private final Map<Integer, HeartbeatWindow> windowMap;
+
+ public HeartbeatCache() {
+ this.windowMap = new HashMap<>();
+ }
+
+ @Override
+ public void cacheHeartBeat(int dataNodeId, HeartbeatPackage newHeartbeat) {
+ containsCache = true;
+ windowMap
+ .computeIfAbsent(dataNodeId, window -> new HeartbeatWindow())
+ .addHeartbeat(newHeartbeat);
+ }
+
+ @Override
+ public void discardAllCache() {
+ if (containsCache) {
+ containsCache = false;
+ windowMap.clear();
+ }
+ }
+}
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/balancer/SeriesPartitionSlotBalancer.java
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/HeartbeatPackage.java
similarity index 63%
copy from
confignode/src/main/java/org/apache/iotdb/confignode/manager/balancer/SeriesPartitionSlotBalancer.java
copy to
confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/HeartbeatPackage.java
index 441f732264..b7bbcb9bbb 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/balancer/SeriesPartitionSlotBalancer.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/HeartbeatPackage.java
@@ -16,6 +16,23 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.confignode.manager.balancer;
+package org.apache.iotdb.confignode.manager.load.heartbeat;
-public class SeriesPartitionSlotBalancer {}
+public class HeartbeatPackage {
+
+ private final long sendTimestamp;
+ private final long receiveTimestamp;
+
+ public HeartbeatPackage(long sendTimestamp, long receiveTimestamp) {
+ this.sendTimestamp = sendTimestamp;
+ this.receiveTimestamp = receiveTimestamp;
+ }
+
+ public long getSendTimestamp() {
+ return sendTimestamp;
+ }
+
+ public long getReceiveTimestamp() {
+ return receiveTimestamp;
+ }
+}
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/HeartbeatWindow.java
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/HeartbeatWindow.java
new file mode 100644
index 0000000000..6478b10a1a
--- /dev/null
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/HeartbeatWindow.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.confignode.manager.load.heartbeat;
+
+import java.util.LinkedList;
+
+/**
+ * HeartbeatWindow contains Heartbeat's sending and receiving time, which is
used for estimating
+ * when the next heartbeat will arrive.
+ */
+public class HeartbeatWindow {
+
+ private static final int maximumWindowSize = 1000;
+
+ private final LinkedList<HeartbeatPackage> slidingWindow;
+
+ public HeartbeatWindow() {
+ this.slidingWindow = new LinkedList<>();
+ }
+
+ public void addHeartbeat(HeartbeatPackage newHeartbeat) {
+ synchronized (slidingWindow) {
+ // Only sequential heartbeats are accepted.
+ // And un-sequential heartbeats will be discarded.
+ if (slidingWindow.size() == 0
+ || slidingWindow.getLast().getSendTimestamp() <
newHeartbeat.getSendTimestamp()) {
+ slidingWindow.add(newHeartbeat);
+ }
+
+ while (slidingWindow.size() > maximumWindowSize) {
+ slidingWindow.removeFirst();
+ }
+ }
+ }
+}
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/balancer/SeriesPartitionSlotBalancer.java
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/IHeartbeatStatistic.java
similarity index 59%
rename from
confignode/src/main/java/org/apache/iotdb/confignode/manager/balancer/SeriesPartitionSlotBalancer.java
rename to
confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/IHeartbeatStatistic.java
index 441f732264..9cfdb0890d 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/balancer/SeriesPartitionSlotBalancer.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/IHeartbeatStatistic.java
@@ -16,6 +16,21 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.confignode.manager.balancer;
+package org.apache.iotdb.confignode.manager.load.heartbeat;
-public class SeriesPartitionSlotBalancer {}
+/** All the interfaces that provided by HeartbeatCache */
+public interface IHeartbeatStatistic {
+
+ /**
+ * Cache the newest HeartbeatData of the specific DataNode
+ *
+ * @param dataNodeId The specific DataNodeId
+ * @param newHeartbeat The newest HeartbeatData
+ */
+ void cacheHeartBeat(int dataNodeId, HeartbeatPackage newHeartbeat);
+
+ // TODO: Interfaces for statistics
+
+ /** Only use this interface when current ConfigNode is not the leader */
+ void discardAllCache();
+}
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/NodeInfo.java
b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/NodeInfo.java
index 38af6e207f..308090848c 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/NodeInfo.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/NodeInfo.java
@@ -18,6 +18,7 @@
*/
package org.apache.iotdb.confignode.persistence;
+import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.utils.NodeUrlUtils;
@@ -28,7 +29,6 @@ import
org.apache.iotdb.confignode.consensus.request.read.GetDataNodeInfoReq;
import org.apache.iotdb.confignode.consensus.request.write.ApplyConfigNodeReq;
import org.apache.iotdb.confignode.consensus.request.write.RegisterDataNodeReq;
import org.apache.iotdb.confignode.consensus.response.DataNodeLocationsResp;
-import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeLocation;
import org.apache.iotdb.rpc.TSStatusCode;
import org.slf4j.Logger;
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
b/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
index 2abff56aaa..08795217ac 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
@@ -18,6 +18,7 @@
*/
package org.apache.iotdb.confignode.service.thrift;
+import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.auth.AuthException;
import org.apache.iotdb.commons.conf.CommonDescriptor;
@@ -49,7 +50,6 @@ import org.apache.iotdb.confignode.rpc.thrift.ConfigIService;
import org.apache.iotdb.confignode.rpc.thrift.TAuthorizerReq;
import org.apache.iotdb.confignode.rpc.thrift.TAuthorizerResp;
import org.apache.iotdb.confignode.rpc.thrift.TCheckUserPrivilegesReq;
-import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeLocation;
import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterReq;
import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterResp;
import org.apache.iotdb.confignode.rpc.thrift.TCountStorageGroupResp;
diff --git
a/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigRequestSerDeTest.java
b/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigRequestSerDeTest.java
index 9f6e10a14c..89f9cdfefa 100644
---
a/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigRequestSerDeTest.java
+++
b/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigRequestSerDeTest.java
@@ -18,6 +18,7 @@
*/
package org.apache.iotdb.confignode.consensus.request;
+import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
@@ -50,7 +51,6 @@ import
org.apache.iotdb.confignode.consensus.request.write.SetTTLReq;
import
org.apache.iotdb.confignode.consensus.request.write.SetTimePartitionIntervalReq;
import org.apache.iotdb.confignode.consensus.request.write.UpdateProcedureReq;
import org.apache.iotdb.confignode.procedure.DeleteStorageGroupProcedure;
-import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeLocation;
import org.apache.iotdb.confignode.rpc.thrift.TStorageGroupSchema;
import org.apache.iotdb.procedure.Procedure;
diff --git
a/node-commons/src/main/java/org/apache/iotdb/commons/utils/NodeUrlUtils.java
b/node-commons/src/main/java/org/apache/iotdb/commons/utils/NodeUrlUtils.java
index ed76fabcc3..a502e4d42e 100644
---
a/node-commons/src/main/java/org/apache/iotdb/commons/utils/NodeUrlUtils.java
+++
b/node-commons/src/main/java/org/apache/iotdb/commons/utils/NodeUrlUtils.java
@@ -19,9 +19,9 @@
package org.apache.iotdb.commons.utils;
+import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.commons.exception.BadNodeUrlException;
-import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeLocation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
diff --git
a/node-commons/src/main/java/org/apache/iotdb/commons/utils/ThriftConfigNodeSerDeUtils.java
b/node-commons/src/main/java/org/apache/iotdb/commons/utils/ThriftConfigNodeSerDeUtils.java
index ce05737608..2b7bef2986 100644
---
a/node-commons/src/main/java/org/apache/iotdb/commons/utils/ThriftConfigNodeSerDeUtils.java
+++
b/node-commons/src/main/java/org/apache/iotdb/commons/utils/ThriftConfigNodeSerDeUtils.java
@@ -18,8 +18,8 @@
*/
package org.apache.iotdb.commons.utils;
+import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
import org.apache.iotdb.commons.exception.runtime.ThriftSerDeException;
-import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeLocation;
import org.apache.iotdb.confignode.rpc.thrift.TStorageGroupSchema;
import org.apache.thrift.TException;
diff --git
a/node-commons/src/test/java/org/apache/iotdb/commons/utils/NodeUrlUtilsTest.java
b/node-commons/src/test/java/org/apache/iotdb/commons/utils/NodeUrlUtilsTest.java
index 3c7667eea4..f6e497f309 100644
---
a/node-commons/src/test/java/org/apache/iotdb/commons/utils/NodeUrlUtilsTest.java
+++
b/node-commons/src/test/java/org/apache/iotdb/commons/utils/NodeUrlUtilsTest.java
@@ -18,9 +18,9 @@
*/
package org.apache.iotdb.commons.utils;
+import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.commons.exception.BadNodeUrlException;
-import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeLocation;
import org.junit.Assert;
import org.junit.Test;
diff --git
a/node-commons/src/test/java/org/apache/iotdb/commons/utils/ThriftConfigNodeSerDeUtilsTest.java
b/node-commons/src/test/java/org/apache/iotdb/commons/utils/ThriftConfigNodeSerDeUtilsTest.java
index 1f150fd2fd..60e0adef7b 100644
---
a/node-commons/src/test/java/org/apache/iotdb/commons/utils/ThriftConfigNodeSerDeUtilsTest.java
+++
b/node-commons/src/test/java/org/apache/iotdb/commons/utils/ThriftConfigNodeSerDeUtilsTest.java
@@ -18,10 +18,10 @@
*/
package org.apache.iotdb.commons.utils;
+import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
-import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeLocation;
import org.apache.iotdb.confignode.rpc.thrift.TStorageGroupSchema;
import org.junit.After;
diff --git
a/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java
b/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java
index 50dd5f4741..ecff7dbcd3 100644
--- a/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java
+++ b/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java
@@ -19,13 +19,13 @@
package org.apache.iotdb.db.client;
+import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.confignode.rpc.thrift.ConfigIService;
import org.apache.iotdb.confignode.rpc.thrift.TAuthorizerReq;
import org.apache.iotdb.confignode.rpc.thrift.TAuthorizerResp;
import org.apache.iotdb.confignode.rpc.thrift.TCheckUserPrivilegesReq;
-import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeLocation;
import org.apache.iotdb.confignode.rpc.thrift.TCountStorageGroupResp;
import org.apache.iotdb.confignode.rpc.thrift.TDataNodeLocationResp;
import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRegisterReq;
diff --git a/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
b/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
index 61b764e2eb..8875150162 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
@@ -18,6 +18,7 @@
*/
package org.apache.iotdb.db.service;
+import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.commons.concurrent.IoTDBDefaultThreadExceptionHandler;
@@ -27,7 +28,6 @@ import org.apache.iotdb.commons.exception.StartupException;
import org.apache.iotdb.commons.service.JMXService;
import org.apache.iotdb.commons.service.RegisterManager;
import org.apache.iotdb.commons.service.StartupChecks;
-import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeLocation;
import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRegisterReq;
import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRegisterResp;
import org.apache.iotdb.db.client.ConfigNodeClient;
diff --git
a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/InternalServiceImpl.java
b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/InternalServiceImpl.java
index b581cedfe1..1a4661d5b4 100644
---
a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/InternalServiceImpl.java
+++
b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/InternalServiceImpl.java
@@ -22,6 +22,8 @@ package org.apache.iotdb.db.service.thrift.impl;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.common.rpc.thrift.THeartbeatReq;
+import org.apache.iotdb.common.rpc.thrift.THeartbeatResp;
import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.consensus.ConsensusGroupId;
@@ -262,6 +264,12 @@ public class InternalServiceImpl implements
InternalService.Iface {
return null;
}
+ @Override
+ public THeartbeatResp getHeartBeat(THeartbeatReq req) throws TException {
+ // TODO: Return load balancing messages
+ return new THeartbeatResp(req.getHeartbeatTimestamp());
+ }
+
@Override
public TSStatus deleteRegion(TConsensusGroupId tconsensusGroupId) throws
TException {
long queryIdRaw = SessionManager.getInstance().requestQueryId(false);
diff --git a/thrift-commons/src/main/thrift/common.thrift
b/thrift-commons/src/main/thrift/common.thrift
index 9b09bc3743..8ffdf3ff40 100644
--- a/thrift-commons/src/main/thrift/common.thrift
+++ b/thrift-commons/src/main/thrift/common.thrift
@@ -57,6 +57,15 @@ struct TRegionReplicaSet {
2: required list<TDataNodeLocation> dataNodeLocations
}
+struct TConfigNodeLocation {
+ 1: required TEndPoint internalEndPoint
+ 2: required TEndPoint consensusEndPoint
+}
+
+struct THeartbeatReq {
+ 1: required i64 heartbeatTimestamp
+}
+
struct TDataNodeLocation {
1: required i32 dataNodeId
// TEndPoint for DataNode's external rpc
@@ -67,4 +76,8 @@ struct TDataNodeLocation {
4: required TEndPoint dataBlockManagerEndPoint
// TEndPoint for DataNode's ConsensusLayer
5: required TEndPoint consensusEndPoint
+}
+
+struct THeartbeatResp {
+ 1: required i64 heartbeatTimestamp
}
\ No newline at end of file
diff --git a/thrift-confignode/src/main/thrift/confignode.thrift
b/thrift-confignode/src/main/thrift/confignode.thrift
index 653d9ac2d7..7fa76108e8 100644
--- a/thrift-confignode/src/main/thrift/confignode.thrift
+++ b/thrift-confignode/src/main/thrift/confignode.thrift
@@ -38,7 +38,7 @@ struct TGlobalConfig {
struct TDataNodeRegisterResp {
1: required common.TSStatus status
- 2: required list<TConfigNodeLocation> configNodeList
+ 2: required list<common.TConfigNodeLocation> configNodeList
3: optional i32 dataNodeId
4: optional TGlobalConfig globalConfig
}
@@ -157,13 +157,8 @@ struct TCheckUserPrivilegesReq{
}
// ConfigNode
-struct TConfigNodeLocation {
- 1: required common.TEndPoint internalEndPoint
- 2: required common.TEndPoint consensusEndPoint
-}
-
struct TConfigNodeRegisterReq {
- 1: required TConfigNodeLocation configNodeLocation
+ 1: required common.TConfigNodeLocation configNodeLocation
2: required string dataNodeConsensusProtocolClass
3: required i32 seriesPartitionSlotNum
4: required string seriesPartitionExecutorClass
@@ -176,7 +171,7 @@ struct TConfigNodeRegisterReq {
struct TConfigNodeRegisterResp {
1: required common.TSStatus status
2: optional common.TConsensusGroupId partitionRegionId
- 3: optional list<TConfigNodeLocation> configNodeList
+ 3: optional list<common.TConfigNodeLocation> configNodeList
}
service ConfigIService {
@@ -233,5 +228,5 @@ service ConfigIService {
TConfigNodeRegisterResp registerConfigNode(TConfigNodeRegisterReq req)
- common.TSStatus applyConfigNode(TConfigNodeLocation configNodeLocation)
+ common.TSStatus applyConfigNode(common.TConfigNodeLocation
configNodeLocation)
}
\ No newline at end of file
diff --git a/thrift/src/main/thrift/mpp.thrift
b/thrift/src/main/thrift/mpp.thrift
index 51be9a80b3..dfc26e4da0 100644
--- a/thrift/src/main/thrift/mpp.thrift
+++ b/thrift/src/main/thrift/mpp.thrift
@@ -204,6 +204,12 @@ service InternalService {
*/
common.TSStatus migrateDataRegion(TMigrateDataRegionReq req)
+ /**
+ * ConfigNode will ask DataNode for heartbeat in every few seconds.
+ *
+ * @param ConfigNode will send the latest config_node_list and load balancing
policies in THeartbeatReq
+ **/
+ common.THeartbeatResp getHeartBeat(common.THeartbeatReq req)
}
service DataBlockService {