This is an automated email from the ASF dual-hosted git repository.
tanxinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 8e78fcae1f1 Use heartbeat to update DataNode's ConfigNodeList (#12232)
8e78fcae1f1 is described below
commit 8e78fcae1f114e0c21e166c79cb2e10d2c3ef4c3
Author: Li Yu Heng <[email protected]>
AuthorDate: Tue Mar 26 16:07:09 2024 +0800
Use heartbeat to update DataNode's ConfigNodeList (#12232)
---
.../iotdb/it/env/cluster/env/AbstractEnv.java | 9 ++++--
.../it/env/cluster/node/AbstractNodeWrapper.java | 2 +-
.../confignode/client/DataNodeRequestType.java | 1 -
.../client/async/AsyncDataNodeClientPool.java | 7 -----
.../client/async/handlers/AsyncClientHandler.java | 1 -
.../heartbeat/DataNodeHeartbeatHandler.java | 4 +++
.../confignode/manager/load/cache/LoadCache.java | 14 +++++++++
.../manager/load/service/HeartbeatService.java | 25 ++++++++++++++++
.../procedure/env/ConfigNodeProcedureEnv.java | 23 ---------------
.../impl/node/AddConfigNodeProcedure.java | 1 -
.../impl/node/RemoveConfigNodeProcedure.java | 1 -
.../iotdb/db/protocol/client/ConfigNodeInfo.java | 34 ++++++++++++++++++----
.../impl/DataNodeInternalRPCServiceImpl.java | 23 +++++----------
.../src/main/thrift/datanode.thrift | 13 ++-------
14 files changed, 89 insertions(+), 69 deletions(-)
diff --git
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AbstractEnv.java
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AbstractEnv.java
index 5296ddcdd3b..17086de3cb2 100644
---
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AbstractEnv.java
+++
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AbstractEnv.java
@@ -346,9 +346,14 @@ public abstract class AbstractEnv implements BaseEnv {
@Override
public void cleanClusterEnvironment() {
- for (AbstractNodeWrapper nodeWrapper :
+ List<AbstractNodeWrapper> allNodeWrappers =
Stream.concat(this.dataNodeWrapperList.stream(),
this.configNodeWrapperList.stream())
- .collect(Collectors.toList())) {
+ .collect(Collectors.toList());
+ allNodeWrappers.stream()
+ .findAny()
+ .ifPresent(
+ nodeWrapper -> logger.info("You can find logs at {}",
nodeWrapper.getLogDirPath()));
+ for (AbstractNodeWrapper nodeWrapper : allNodeWrappers) {
nodeWrapper.stopForcibly();
nodeWrapper.destroyDir();
String lockPath = EnvUtils.getLockFilePath(nodeWrapper.getPort());
diff --git
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/node/AbstractNodeWrapper.java
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/node/AbstractNodeWrapper.java
index b1bd3236973..550635a69d9 100644
---
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/node/AbstractNodeWrapper.java
+++
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/node/AbstractNodeWrapper.java
@@ -531,7 +531,7 @@ public abstract class AbstractNodeWrapper implements
BaseNodeWrapper {
return getLogDirPath() + File.separator + getId() + ".log";
}
- protected String getLogDirPath() {
+ public String getLogDirPath() {
String baseDir =
System.getProperty(USER_DIR)
+ File.separator
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/DataNodeRequestType.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/DataNodeRequestType.java
index 878e0c8e115..db0007f1a95 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/DataNodeRequestType.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/DataNodeRequestType.java
@@ -32,7 +32,6 @@ public enum DataNodeRequestType {
STOP_REPAIR_DATA,
LOAD_CONFIGURATION,
SET_SYSTEM_STATUS,
- BROADCAST_LATEST_CONFIG_NODE_GROUP,
// Region Maintenance
CREATE_DATA_REGION,
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncDataNodeClientPool.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncDataNodeClientPool.java
index 4aaf5c2633d..82bfa5c1c26 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncDataNodeClientPool.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncDataNodeClientPool.java
@@ -73,7 +73,6 @@ import org.apache.iotdb.mpp.rpc.thrift.TRegionRouteReq;
import org.apache.iotdb.mpp.rpc.thrift.TRollbackSchemaBlackListReq;
import org.apache.iotdb.mpp.rpc.thrift.TRollbackSchemaBlackListWithTemplateReq;
import org.apache.iotdb.mpp.rpc.thrift.TRollbackViewSchemaBlackListReq;
-import org.apache.iotdb.mpp.rpc.thrift.TUpdateConfigNodeGroupReq;
import org.apache.iotdb.mpp.rpc.thrift.TUpdateTemplateReq;
import org.apache.iotdb.mpp.rpc.thrift.TUpdateTriggerLocationReq;
@@ -343,12 +342,6 @@ public class AsyncDataNodeClientPool {
(AsyncTSStatusRPCHandler)
clientHandler.createAsyncRPCHandler(requestId,
targetDataNode));
break;
- case BROADCAST_LATEST_CONFIG_NODE_GROUP:
- client.updateConfigNodeGroup(
- (TUpdateConfigNodeGroupReq) clientHandler.getRequest(requestId),
- (AsyncTSStatusRPCHandler)
- clientHandler.createAsyncRPCHandler(requestId,
targetDataNode));
- break;
case CONSTRUCT_SCHEMA_BLACK_LIST:
client.constructSchemaBlackList(
(TConstructSchemaBlackListReq)
clientHandler.getRequest(requestId),
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/AsyncClientHandler.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/AsyncClientHandler.java
index 1622849ea45..27fcadbbd12 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/AsyncClientHandler.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/AsyncClientHandler.java
@@ -258,7 +258,6 @@ public class AsyncClientHandler<Q, R> {
case LOAD_CONFIGURATION:
case SET_SYSTEM_STATUS:
case UPDATE_REGION_ROUTE_MAP:
- case BROADCAST_LATEST_CONFIG_NODE_GROUP:
case INVALIDATE_MATCHED_SCHEMA_CACHE:
case UPDATE_TEMPLATE:
case CHANGE_REGION_LEADER:
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/heartbeat/DataNodeHeartbeatHandler.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/heartbeat/DataNodeHeartbeatHandler.java
index 99649e7b715..6d8a292ebc8 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/heartbeat/DataNodeHeartbeatHandler.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/heartbeat/DataNodeHeartbeatHandler.java
@@ -113,6 +113,10 @@ public class DataNodeHeartbeatHandler implements
AsyncMethodCallback<TDataNodeHe
if (heartbeatResp.getPipeMetaList() != null) {
pipeRuntimeCoordinator.parseHeartbeat(nodeId,
heartbeatResp.getPipeMetaList());
}
+ if (heartbeatResp.isSetConfirmedConfigNodeEndPoints()) {
+ loadCache.updateConfirmedConfigNodeEndPoints(
+ nodeId, heartbeatResp.getConfirmedConfigNodeEndPoints());
+ }
}
@Override
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/LoadCache.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/LoadCache.java
index 9a237eb99c5..4d4551bea65 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/LoadCache.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/LoadCache.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
import org.apache.iotdb.common.rpc.thrift.TDataNodeConfiguration;
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
import org.apache.iotdb.commons.cluster.NodeStatus;
import org.apache.iotdb.commons.cluster.NodeType;
@@ -51,6 +52,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
+import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -74,11 +76,14 @@ public class LoadCache {
private final Map<TConsensusGroupId, RegionGroupCache> regionGroupCacheMap;
// Map<RegionGroupId, RegionRouteCache>
private final Map<TConsensusGroupId, RegionRouteCache> regionRouteCacheMap;
+ // Map<DataNodeId, confirmedConfigNodes>
+ private final Map<Integer, Set<TEndPoint>> confirmedConfigNodeMap;
public LoadCache() {
this.nodeCacheMap = new ConcurrentHashMap<>();
this.regionGroupCacheMap = new ConcurrentHashMap<>();
this.regionRouteCacheMap = new ConcurrentHashMap<>();
+ this.confirmedConfigNodeMap = new ConcurrentHashMap<>();
}
public void initHeartbeatCache(IManager configManager) {
@@ -604,4 +609,13 @@ public class LoadCache {
public boolean existUnreadyRegionGroup() {
return
regionRouteCacheMap.values().stream().anyMatch(RegionRouteCache::isRegionGroupUnready);
}
+
+ public void updateConfirmedConfigNodeEndPoints(
+ int dataNodeId, Set<TEndPoint> configNodeEndPoints) {
+ confirmedConfigNodeMap.put(dataNodeId, configNodeEndPoints);
+ }
+
+ public Set<TEndPoint> getConfirmedConfigNodeEndPoints(int dataNodeId) {
+ return confirmedConfigNodeMap.get(dataNodeId);
+ }
}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/HeartbeatService.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/HeartbeatService.java
index ebc2ef4f9bd..7d5dff12ede 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/HeartbeatService.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/HeartbeatService.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.confignode.manager.load.service;
import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TDataNodeConfiguration;
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.commons.concurrent.ThreadName;
import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
@@ -44,10 +45,12 @@ import org.slf4j.LoggerFactory;
import java.util.List;
import java.util.Optional;
+import java.util.Set;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
/** Maintain the Cluster-Heartbeat-Service. */
public class HeartbeatService {
@@ -69,6 +72,7 @@ public class HeartbeatService {
IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor(
ThreadName.CONFIG_NODE_HEART_BEAT_SERVICE.getName());
private final AtomicLong heartbeatCounter = new AtomicLong(0);
+ private static final int configNodeListPeriodicallySyncInterval = 100;
public HeartbeatService(IManager configManager, LoadCache loadCache) {
this.configManager = configManager;
@@ -150,6 +154,26 @@ public class HeartbeatService {
return heartbeatReq;
}
+ private void addConfigNodeLocationsToReq(int dataNodeId,
TDataNodeHeartbeatReq req) {
+ Set<TEndPoint> confirmedConfigNodes =
loadCache.getConfirmedConfigNodeEndPoints(dataNodeId);
+ Set<TEndPoint> actualConfigNodes =
+ getNodeManager().getRegisteredConfigNodes().stream()
+ .map(TConfigNodeLocation::getInternalEndPoint)
+ .collect(Collectors.toSet());
+ /*
+ In most cases, comparing actualConfigNodes and confirmedConfigNodes is
sufficient, but in some cases it's not, hence the need for periodic sending.
+ Here's an example:
+ 1. There are ConfigNode A and B, and one DataNode in the cluster.
DataNode persists "ConfigNode list = [A,B]".
+ 2. ConfigNode B is removed. DataNode persists "ConfigNode list = [A]"
but fails to confirm it to ConfigNode.
+ 3. ConfigNode B is re-added to the cluster.
+ 4. At this point, because actualConfigNodes and confirmedConfigNodes are
identical, the ConfigNode list is not re-sent to the DataNode.
+ */
+ if (!actualConfigNodes.equals(confirmedConfigNodes)
+ || heartbeatCounter.get() % configNodeListPeriodicallySyncInterval ==
0) {
+ req.setConfigNodeEndPoints(actualConfigNodes);
+ }
+ }
+
private TConfigNodeHeartbeatReq genConfigNodeHeartbeatReq() {
TConfigNodeHeartbeatReq req = new TConfigNodeHeartbeatReq();
req.setTimestamp(System.nanoTime());
@@ -198,6 +222,7 @@ public class HeartbeatService {
configManager.getClusterSchemaManager()::updateDeviceUsage,
configManager.getPipeManager().getPipeRuntimeCoordinator());
configManager.getClusterQuotaManager().updateSpaceQuotaUsage();
+ addConfigNodeLocationsToReq(dataNodeInfo.getLocation().getDataNodeId(),
heartbeatReq);
AsyncDataNodeHeartbeatClientPool.getInstance()
.getDataNodeHeartBeat(
dataNodeInfo.getLocation().getInternalEndPoint(), heartbeatReq,
handler);
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
index a8b596862d8..93990bb04d5 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
@@ -78,7 +78,6 @@ import
org.apache.iotdb.mpp.rpc.thrift.TPushSingleConsumerGroupMetaReq;
import org.apache.iotdb.mpp.rpc.thrift.TPushSinglePipeMetaReq;
import org.apache.iotdb.mpp.rpc.thrift.TPushSingleTopicMetaReq;
import org.apache.iotdb.mpp.rpc.thrift.TPushTopicMetaResp;
-import org.apache.iotdb.mpp.rpc.thrift.TUpdateConfigNodeGroupReq;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.tsfile.utils.Binary;
@@ -364,28 +363,6 @@ public class ConfigNodeProcedureEnv {
ConfigNodeRequestType.NOTIFY_REGISTER_SUCCESS);
}
- /** Notify all DataNodes when the capacity of the ConfigNodeGroup is
expanded or reduced. */
- public void broadCastTheLatestConfigNodeGroup() {
- List<TConfigNodeLocation> registeredConfigNodes =
- configManager.getNodeManager().getRegisteredConfigNodes();
- Map<Integer, TDataNodeLocation> registeredDataNodes =
- configManager.getNodeManager().getRegisteredDataNodeLocations();
- AsyncClientHandler<TUpdateConfigNodeGroupReq, TSStatus> clientHandler =
- new AsyncClientHandler<>(
- DataNodeRequestType.BROADCAST_LATEST_CONFIG_NODE_GROUP,
- new TUpdateConfigNodeGroupReq(registeredConfigNodes),
- registeredDataNodes);
-
- if (!registeredDataNodes.isEmpty()) {
- LOG.info(
- "Begin to broadcast the latest configNodeGroup to DataNodes,
ConfigNodeGroups: {}, DataNodes: {}",
- registeredConfigNodes,
- registeredDataNodes.values());
-
AsyncDataNodeClientPool.getInstance().sendAsyncRequestToDataNodeWithRetry(clientHandler);
- LOG.info("Broadcast the latest configNodeGroup to DataNodes finished.");
- }
- }
-
/**
* Mark the given datanode as removing status to avoid read or write request
routing to this node.
*
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/node/AddConfigNodeProcedure.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/node/AddConfigNodeProcedure.java
index 01c7d82ec62..4fe23220893 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/node/AddConfigNodeProcedure.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/node/AddConfigNodeProcedure.java
@@ -85,7 +85,6 @@ public class AddConfigNodeProcedure extends
AbstractNodeProcedure<AddConfigNodeS
case REGISTER_SUCCESS:
env.notifyRegisterSuccess(tConfigNodeLocation);
env.applyConfigNode(tConfigNodeLocation, versionInfo);
- env.broadCastTheLatestConfigNodeGroup();
env.getConfigManager()
.getLoadManager()
.forceUpdateNodeCache(
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/node/RemoveConfigNodeProcedure.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/node/RemoveConfigNodeProcedure.java
index 037509fb8cf..0910f06605d 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/node/RemoveConfigNodeProcedure.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/node/RemoveConfigNodeProcedure.java
@@ -69,7 +69,6 @@ public class RemoveConfigNodeProcedure extends
AbstractNodeProcedure<RemoveConfi
LOG.info("Delete peer for ConfigNode: {}", removedConfigNode);
break;
case STOP_CONFIG_NODE:
- env.broadCastTheLatestConfigNodeGroup();
env.stopConfigNode(removedConfigNode);
LOG.info("Stop ConfigNode: {}", removedConfigNode);
return Flow.NO_MORE_STATE;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/ConfigNodeInfo.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/ConfigNodeInfo.java
index 68b6ae6b9c1..91050bc73b8 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/ConfigNodeInfo.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/ConfigNodeInfo.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.commons.consensus.ConfigRegionId;
import org.apache.iotdb.commons.exception.BadNodeUrlException;
import org.apache.iotdb.commons.file.SystemFileFactory;
+import org.apache.iotdb.commons.utils.FileUtils;
import org.apache.iotdb.commons.utils.NodeUrlUtils;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
@@ -56,6 +57,7 @@ public class ConfigNodeInfo {
public static final ConfigRegionId CONFIG_REGION_ID = new ConfigRegionId(0);
private final File propertiesFile;
+ private final File propertiesFileTmp;
private ConfigNodeInfo() {
this.configNodeInfoReadWriteLock = new ReentrantReadWriteLock();
@@ -65,6 +67,15 @@ public class ConfigNodeInfo {
IoTDBDescriptor.getInstance().getConfig().getSystemDir()
+ File.separator
+ PROPERTIES_FILE_NAME);
+ propertiesFileTmp =
+ SystemFileFactory.INSTANCE.getFile(propertiesFile.getAbsolutePath() +
".tmp");
+ if (propertiesFileTmp.exists()) {
+ try {
+ updatePropertiesFile();
+ } catch (IOException e) {
+ logger.error("Update properties file fail", e);
+ }
+ }
}
// TODO: This needs removal of statics ...
public static void reinitializeStatics() {
@@ -72,14 +83,14 @@ public class ConfigNodeInfo {
}
/** Update ConfigNodeList both in memory and system.properties file */
- public void updateConfigNodeList(List<TEndPoint> latestConfigNodes) {
+ public boolean updateConfigNodeList(List<TEndPoint> latestConfigNodes) {
long startTime = System.currentTimeMillis();
// Check whether the config nodes are latest or not
configNodeInfoReadWriteLock.readLock().lock();
try {
- if (onlineConfigNodes.containsAll(latestConfigNodes)
- && new HashSet<>(latestConfigNodes).containsAll(onlineConfigNodes)) {
- return;
+ if (onlineConfigNodes.size() == latestConfigNodes.size()
+ && onlineConfigNodes.containsAll(latestConfigNodes)) {
+ return true;
}
} finally {
configNodeInfoReadWriteLock.readLock().unlock();
@@ -98,9 +109,11 @@ public class ConfigNodeInfo {
(endTime - startTime));
} catch (IOException e) {
logger.error("Update ConfigNode failed.", e);
+ return false;
} finally {
configNodeInfoReadWriteLock.writeLock().unlock();
}
+ return true;
}
/**
@@ -115,9 +128,20 @@ public class ConfigNodeInfo {
}
properties.setProperty(
CONFIG_NODE_LIST, NodeUrlUtils.convertTEndPointUrls(new
ArrayList<>(onlineConfigNodes)));
- try (FileOutputStream fileOutputStream = new
FileOutputStream(propertiesFile)) {
+ try (FileOutputStream fileOutputStream = new
FileOutputStream(propertiesFileTmp)) {
properties.store(fileOutputStream, "");
}
+ updatePropertiesFile();
+ }
+
+ private void updatePropertiesFile() throws IOException {
+ if (!propertiesFile.delete()) {
+ String msg =
+ String.format(
+ "Update %s file fail: %s", PROPERTIES_FILE_NAME,
propertiesFile.getAbsoluteFile());
+ throw new IOException(msg);
+ }
+ FileUtils.moveFileSafe(propertiesFileTmp, propertiesFile);
}
public void loadConfigNodeList() {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
index 777a6298106..d2e57c6bbb6 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
@@ -19,7 +19,6 @@
package org.apache.iotdb.db.protocol.thrift.impl;
-import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
@@ -212,7 +211,6 @@ import
org.apache.iotdb.mpp.rpc.thrift.TSendFragmentInstanceReq;
import org.apache.iotdb.mpp.rpc.thrift.TSendFragmentInstanceResp;
import org.apache.iotdb.mpp.rpc.thrift.TSendSinglePlanNodeResp;
import org.apache.iotdb.mpp.rpc.thrift.TTsFilePieceReq;
-import org.apache.iotdb.mpp.rpc.thrift.TUpdateConfigNodeGroupReq;
import org.apache.iotdb.mpp.rpc.thrift.TUpdateTemplateReq;
import org.apache.iotdb.mpp.rpc.thrift.TUpdateTriggerLocationReq;
import org.apache.iotdb.rpc.RpcUtils;
@@ -1310,6 +1308,13 @@ public class DataNodeInternalRPCServiceImpl implements
IDataNodeRPCService.Iface
PipeAgent.task().collectPipeMetaList(resp);
}
+ if (req.isSetConfigNodeEndPoints()) {
+ if (ConfigNodeInfo.getInstance()
+ .updateConfigNodeList(new
ArrayList<>(req.getConfigNodeEndPoints()))) {
+ resp.setConfirmedConfigNodeEndPoints(req.getConfigNodeEndPoints());
+ }
+ }
+
return resp;
}
@@ -1542,20 +1547,6 @@ public class DataNodeInternalRPCServiceImpl implements
IDataNodeRPCService.Iface
return storageEngine.setTTL(req);
}
- @Override
- public TSStatus updateConfigNodeGroup(TUpdateConfigNodeGroupReq req) {
- List<TConfigNodeLocation> configNodeLocations =
req.getConfigNodeLocations();
- if (configNodeLocations != null) {
- ConfigNodeInfo.getInstance()
- .updateConfigNodeList(
- configNodeLocations
- .parallelStream()
- .map(TConfigNodeLocation::getInternalEndPoint)
- .collect(Collectors.toList()));
- }
- return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
- }
-
@Override
public TSStatus updateTemplate(TUpdateTemplateReq req) {
switch (TemplateInternalRPCUpdateType.getType(req.type)) {
diff --git a/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift
b/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift
index 1d818fad3c0..ddc14327a16 100644
--- a/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift
+++ b/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift
@@ -252,6 +252,7 @@ struct TDataNodeHeartbeatReq {
8: optional bool needPipeMetaList
9: optional i64 deviceQuotaRemain
10: optional TDataNodeActivation activation
+ 11: optional set<common.TEndPoint> configNodeEndPoints
}
struct TDataNodeActivation {
@@ -273,6 +274,7 @@ struct TDataNodeHeartbeatResp {
9: optional TSchemaLimitLevel schemaLimitLevel
10: optional list<binary> pipeMetaList
11: optional string activateStatus
+ 12: optional set<common.TEndPoint> confirmedConfigNodeEndPoints
}
struct TPipeHeartbeatReq {
@@ -305,10 +307,6 @@ struct TRegionRouteReq {
2: required map<common.TConsensusGroupId, common.TRegionReplicaSet>
regionRouteMap
}
-struct TUpdateConfigNodeGroupReq {
- 1: required list<common.TConfigNodeLocation> configNodeLocations
-}
-
struct TUpdateTemplateReq {
1: required byte type
2: required binary templateInfo
@@ -803,13 +801,6 @@ service IDataNodeRPCService {
* Config node will Set the TTL for the database on a list of data nodes.
*/
common.TSStatus setTTL(common.TSetTTLReq req)
-
- /**
- * configNode will notify all DataNodes when the capacity of the
ConfigNodeGroup is expanded or reduced
- *
- * @param list<common.TConfigNodeLocation> configNodeLocations
- */
- common.TSStatus updateConfigNodeGroup(TUpdateConfigNodeGroupReq req)
/**
* Update template cache when template info or template set info is updated