This is an automated email from the ASF dual-hosted git repository.

tanxinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 8e78fcae1f1 Use heartbeat to update DataNode's ConfigNodeList (#12232)
8e78fcae1f1 is described below

commit 8e78fcae1f114e0c21e166c79cb2e10d2c3ef4c3
Author: Li Yu Heng <[email protected]>
AuthorDate: Tue Mar 26 16:07:09 2024 +0800

    Use heartbeat to update DataNode's ConfigNodeList (#12232)
---
 .../iotdb/it/env/cluster/env/AbstractEnv.java      |  9 ++++--
 .../it/env/cluster/node/AbstractNodeWrapper.java   |  2 +-
 .../confignode/client/DataNodeRequestType.java     |  1 -
 .../client/async/AsyncDataNodeClientPool.java      |  7 -----
 .../client/async/handlers/AsyncClientHandler.java  |  1 -
 .../heartbeat/DataNodeHeartbeatHandler.java        |  4 +++
 .../confignode/manager/load/cache/LoadCache.java   | 14 +++++++++
 .../manager/load/service/HeartbeatService.java     | 25 ++++++++++++++++
 .../procedure/env/ConfigNodeProcedureEnv.java      | 23 ---------------
 .../impl/node/AddConfigNodeProcedure.java          |  1 -
 .../impl/node/RemoveConfigNodeProcedure.java       |  1 -
 .../iotdb/db/protocol/client/ConfigNodeInfo.java   | 34 ++++++++++++++++++----
 .../impl/DataNodeInternalRPCServiceImpl.java       | 23 +++++----------
 .../src/main/thrift/datanode.thrift                | 13 ++-------
 14 files changed, 89 insertions(+), 69 deletions(-)

diff --git 
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AbstractEnv.java
 
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AbstractEnv.java
index 5296ddcdd3b..17086de3cb2 100644
--- 
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AbstractEnv.java
+++ 
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AbstractEnv.java
@@ -346,9 +346,14 @@ public abstract class AbstractEnv implements BaseEnv {
 
   @Override
   public void cleanClusterEnvironment() {
-    for (AbstractNodeWrapper nodeWrapper :
+    List<AbstractNodeWrapper> allNodeWrappers =
         Stream.concat(this.dataNodeWrapperList.stream(), 
this.configNodeWrapperList.stream())
-            .collect(Collectors.toList())) {
+            .collect(Collectors.toList());
+    allNodeWrappers.stream()
+        .findAny()
+        .ifPresent(
+            nodeWrapper -> logger.info("You can find logs at {}", 
nodeWrapper.getLogDirPath()));
+    for (AbstractNodeWrapper nodeWrapper : allNodeWrappers) {
       nodeWrapper.stopForcibly();
       nodeWrapper.destroyDir();
       String lockPath = EnvUtils.getLockFilePath(nodeWrapper.getPort());
diff --git 
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/node/AbstractNodeWrapper.java
 
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/node/AbstractNodeWrapper.java
index b1bd3236973..550635a69d9 100644
--- 
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/node/AbstractNodeWrapper.java
+++ 
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/node/AbstractNodeWrapper.java
@@ -531,7 +531,7 @@ public abstract class AbstractNodeWrapper implements 
BaseNodeWrapper {
     return getLogDirPath() + File.separator + getId() + ".log";
   }
 
-  protected String getLogDirPath() {
+  public String getLogDirPath() {
     String baseDir =
         System.getProperty(USER_DIR)
             + File.separator
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/DataNodeRequestType.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/DataNodeRequestType.java
index 878e0c8e115..db0007f1a95 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/DataNodeRequestType.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/DataNodeRequestType.java
@@ -32,7 +32,6 @@ public enum DataNodeRequestType {
   STOP_REPAIR_DATA,
   LOAD_CONFIGURATION,
   SET_SYSTEM_STATUS,
-  BROADCAST_LATEST_CONFIG_NODE_GROUP,
 
   // Region Maintenance
   CREATE_DATA_REGION,
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncDataNodeClientPool.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncDataNodeClientPool.java
index 4aaf5c2633d..82bfa5c1c26 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncDataNodeClientPool.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncDataNodeClientPool.java
@@ -73,7 +73,6 @@ import org.apache.iotdb.mpp.rpc.thrift.TRegionRouteReq;
 import org.apache.iotdb.mpp.rpc.thrift.TRollbackSchemaBlackListReq;
 import org.apache.iotdb.mpp.rpc.thrift.TRollbackSchemaBlackListWithTemplateReq;
 import org.apache.iotdb.mpp.rpc.thrift.TRollbackViewSchemaBlackListReq;
-import org.apache.iotdb.mpp.rpc.thrift.TUpdateConfigNodeGroupReq;
 import org.apache.iotdb.mpp.rpc.thrift.TUpdateTemplateReq;
 import org.apache.iotdb.mpp.rpc.thrift.TUpdateTriggerLocationReq;
 
@@ -343,12 +342,6 @@ public class AsyncDataNodeClientPool {
               (AsyncTSStatusRPCHandler)
                   clientHandler.createAsyncRPCHandler(requestId, 
targetDataNode));
           break;
-        case BROADCAST_LATEST_CONFIG_NODE_GROUP:
-          client.updateConfigNodeGroup(
-              (TUpdateConfigNodeGroupReq) clientHandler.getRequest(requestId),
-              (AsyncTSStatusRPCHandler)
-                  clientHandler.createAsyncRPCHandler(requestId, 
targetDataNode));
-          break;
         case CONSTRUCT_SCHEMA_BLACK_LIST:
           client.constructSchemaBlackList(
               (TConstructSchemaBlackListReq) 
clientHandler.getRequest(requestId),
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/AsyncClientHandler.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/AsyncClientHandler.java
index 1622849ea45..27fcadbbd12 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/AsyncClientHandler.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/AsyncClientHandler.java
@@ -258,7 +258,6 @@ public class AsyncClientHandler<Q, R> {
       case LOAD_CONFIGURATION:
       case SET_SYSTEM_STATUS:
       case UPDATE_REGION_ROUTE_MAP:
-      case BROADCAST_LATEST_CONFIG_NODE_GROUP:
       case INVALIDATE_MATCHED_SCHEMA_CACHE:
       case UPDATE_TEMPLATE:
       case CHANGE_REGION_LEADER:
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/heartbeat/DataNodeHeartbeatHandler.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/heartbeat/DataNodeHeartbeatHandler.java
index 99649e7b715..6d8a292ebc8 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/heartbeat/DataNodeHeartbeatHandler.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/heartbeat/DataNodeHeartbeatHandler.java
@@ -113,6 +113,10 @@ public class DataNodeHeartbeatHandler implements 
AsyncMethodCallback<TDataNodeHe
     if (heartbeatResp.getPipeMetaList() != null) {
       pipeRuntimeCoordinator.parseHeartbeat(nodeId, 
heartbeatResp.getPipeMetaList());
     }
+    if (heartbeatResp.isSetConfirmedConfigNodeEndPoints()) {
+      loadCache.updateConfirmedConfigNodeEndPoints(
+          nodeId, heartbeatResp.getConfirmedConfigNodeEndPoints());
+    }
   }
 
   @Override
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/LoadCache.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/LoadCache.java
index 9a237eb99c5..4d4551bea65 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/LoadCache.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/LoadCache.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
 import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
 import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
 import org.apache.iotdb.common.rpc.thrift.TDataNodeConfiguration;
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
 import org.apache.iotdb.commons.cluster.NodeStatus;
 import org.apache.iotdb.commons.cluster.NodeType;
@@ -51,6 +52,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -74,11 +76,14 @@ public class LoadCache {
   private final Map<TConsensusGroupId, RegionGroupCache> regionGroupCacheMap;
   // Map<RegionGroupId, RegionRouteCache>
   private final Map<TConsensusGroupId, RegionRouteCache> regionRouteCacheMap;
+  // Map<DataNodeId, confirmedConfigNodes>
+  private final Map<Integer, Set<TEndPoint>> confirmedConfigNodeMap;
 
   public LoadCache() {
     this.nodeCacheMap = new ConcurrentHashMap<>();
     this.regionGroupCacheMap = new ConcurrentHashMap<>();
     this.regionRouteCacheMap = new ConcurrentHashMap<>();
+    this.confirmedConfigNodeMap = new ConcurrentHashMap<>();
   }
 
   public void initHeartbeatCache(IManager configManager) {
@@ -604,4 +609,13 @@ public class LoadCache {
   public boolean existUnreadyRegionGroup() {
     return 
regionRouteCacheMap.values().stream().anyMatch(RegionRouteCache::isRegionGroupUnready);
   }
+
+  public void updateConfirmedConfigNodeEndPoints(
+      int dataNodeId, Set<TEndPoint> configNodeEndPoints) {
+    confirmedConfigNodeMap.put(dataNodeId, configNodeEndPoints);
+  }
+
+  public Set<TEndPoint> getConfirmedConfigNodeEndPoints(int dataNodeId) {
+    return confirmedConfigNodeMap.get(dataNodeId);
+  }
 }
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/HeartbeatService.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/HeartbeatService.java
index ebc2ef4f9bd..7d5dff12ede 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/HeartbeatService.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/HeartbeatService.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.confignode.manager.load.service;
 
 import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
 import org.apache.iotdb.common.rpc.thrift.TDataNodeConfiguration;
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
 import org.apache.iotdb.commons.concurrent.ThreadName;
 import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
@@ -44,10 +45,12 @@ import org.slf4j.LoggerFactory;
 
 import java.util.List;
 import java.util.Optional;
+import java.util.Set;
 import java.util.concurrent.Future;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
 
 /** Maintain the Cluster-Heartbeat-Service. */
 public class HeartbeatService {
@@ -69,6 +72,7 @@ public class HeartbeatService {
       IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor(
           ThreadName.CONFIG_NODE_HEART_BEAT_SERVICE.getName());
   private final AtomicLong heartbeatCounter = new AtomicLong(0);
+  private static final int configNodeListPeriodicallySyncInterval = 100;
 
   public HeartbeatService(IManager configManager, LoadCache loadCache) {
     this.configManager = configManager;
@@ -150,6 +154,26 @@ public class HeartbeatService {
     return heartbeatReq;
   }
 
+  private void addConfigNodeLocationsToReq(int dataNodeId, 
TDataNodeHeartbeatReq req) {
+    Set<TEndPoint> confirmedConfigNodes = 
loadCache.getConfirmedConfigNodeEndPoints(dataNodeId);
+    Set<TEndPoint> actualConfigNodes =
+        getNodeManager().getRegisteredConfigNodes().stream()
+            .map(TConfigNodeLocation::getInternalEndPoint)
+            .collect(Collectors.toSet());
+    /*
+      In most cases, comparing actualConfigNodes and confirmedConfigNodes is 
sufficient, but in some cases it's not, hence the need for periodic sending.
+      Here's an example:
+      1. There are ConfigNode A and B, and one DataNode in the cluster. 
DataNode persists "ConfigNode list = [A,B]".
+      2. ConfigNode B is removed. DataNode persists "ConfigNode list = [A]" 
but fails to confirm it to ConfigNode.
+      3. ConfigNode B is re-added to the cluster.
+      4. At this point, because actualConfigNodes and confirmedConfigNodes are 
identical, the ConfigNode list is not re-sent to the DataNode.
+    */
+    if (!actualConfigNodes.equals(confirmedConfigNodes)
+        || heartbeatCounter.get() % configNodeListPeriodicallySyncInterval == 
0) {
+      req.setConfigNodeEndPoints(actualConfigNodes);
+    }
+  }
+
   private TConfigNodeHeartbeatReq genConfigNodeHeartbeatReq() {
     TConfigNodeHeartbeatReq req = new TConfigNodeHeartbeatReq();
     req.setTimestamp(System.nanoTime());
@@ -198,6 +222,7 @@ public class HeartbeatService {
               configManager.getClusterSchemaManager()::updateDeviceUsage,
               configManager.getPipeManager().getPipeRuntimeCoordinator());
       configManager.getClusterQuotaManager().updateSpaceQuotaUsage();
+      addConfigNodeLocationsToReq(dataNodeInfo.getLocation().getDataNodeId(), 
heartbeatReq);
       AsyncDataNodeHeartbeatClientPool.getInstance()
           .getDataNodeHeartBeat(
               dataNodeInfo.getLocation().getInternalEndPoint(), heartbeatReq, 
handler);
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
index a8b596862d8..93990bb04d5 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
@@ -78,7 +78,6 @@ import 
org.apache.iotdb.mpp.rpc.thrift.TPushSingleConsumerGroupMetaReq;
 import org.apache.iotdb.mpp.rpc.thrift.TPushSinglePipeMetaReq;
 import org.apache.iotdb.mpp.rpc.thrift.TPushSingleTopicMetaReq;
 import org.apache.iotdb.mpp.rpc.thrift.TPushTopicMetaResp;
-import org.apache.iotdb.mpp.rpc.thrift.TUpdateConfigNodeGroupReq;
 import org.apache.iotdb.rpc.TSStatusCode;
 import org.apache.iotdb.tsfile.utils.Binary;
 
@@ -364,28 +363,6 @@ public class ConfigNodeProcedureEnv {
             ConfigNodeRequestType.NOTIFY_REGISTER_SUCCESS);
   }
 
-  /** Notify all DataNodes when the capacity of the ConfigNodeGroup is 
expanded or reduced. */
-  public void broadCastTheLatestConfigNodeGroup() {
-    List<TConfigNodeLocation> registeredConfigNodes =
-        configManager.getNodeManager().getRegisteredConfigNodes();
-    Map<Integer, TDataNodeLocation> registeredDataNodes =
-        configManager.getNodeManager().getRegisteredDataNodeLocations();
-    AsyncClientHandler<TUpdateConfigNodeGroupReq, TSStatus> clientHandler =
-        new AsyncClientHandler<>(
-            DataNodeRequestType.BROADCAST_LATEST_CONFIG_NODE_GROUP,
-            new TUpdateConfigNodeGroupReq(registeredConfigNodes),
-            registeredDataNodes);
-
-    if (!registeredDataNodes.isEmpty()) {
-      LOG.info(
-          "Begin to broadcast the latest configNodeGroup to DataNodes, 
ConfigNodeGroups: {}, DataNodes: {}",
-          registeredConfigNodes,
-          registeredDataNodes.values());
-      
AsyncDataNodeClientPool.getInstance().sendAsyncRequestToDataNodeWithRetry(clientHandler);
-      LOG.info("Broadcast the latest configNodeGroup to DataNodes finished.");
-    }
-  }
-
   /**
    * Mark the given datanode as removing status to avoid read or write request 
routing to this node.
    *
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/node/AddConfigNodeProcedure.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/node/AddConfigNodeProcedure.java
index 01c7d82ec62..4fe23220893 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/node/AddConfigNodeProcedure.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/node/AddConfigNodeProcedure.java
@@ -85,7 +85,6 @@ public class AddConfigNodeProcedure extends 
AbstractNodeProcedure<AddConfigNodeS
         case REGISTER_SUCCESS:
           env.notifyRegisterSuccess(tConfigNodeLocation);
           env.applyConfigNode(tConfigNodeLocation, versionInfo);
-          env.broadCastTheLatestConfigNodeGroup();
           env.getConfigManager()
               .getLoadManager()
               .forceUpdateNodeCache(
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/node/RemoveConfigNodeProcedure.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/node/RemoveConfigNodeProcedure.java
index 037509fb8cf..0910f06605d 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/node/RemoveConfigNodeProcedure.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/node/RemoveConfigNodeProcedure.java
@@ -69,7 +69,6 @@ public class RemoveConfigNodeProcedure extends 
AbstractNodeProcedure<RemoveConfi
           LOG.info("Delete peer for ConfigNode: {}", removedConfigNode);
           break;
         case STOP_CONFIG_NODE:
-          env.broadCastTheLatestConfigNodeGroup();
           env.stopConfigNode(removedConfigNode);
           LOG.info("Stop ConfigNode: {}", removedConfigNode);
           return Flow.NO_MORE_STATE;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/ConfigNodeInfo.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/ConfigNodeInfo.java
index 68b6ae6b9c1..91050bc73b8 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/ConfigNodeInfo.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/ConfigNodeInfo.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.commons.consensus.ConfigRegionId;
 import org.apache.iotdb.commons.exception.BadNodeUrlException;
 import org.apache.iotdb.commons.file.SystemFileFactory;
+import org.apache.iotdb.commons.utils.FileUtils;
 import org.apache.iotdb.commons.utils.NodeUrlUtils;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 
@@ -56,6 +57,7 @@ public class ConfigNodeInfo {
   public static final ConfigRegionId CONFIG_REGION_ID = new ConfigRegionId(0);
 
   private final File propertiesFile;
+  private final File propertiesFileTmp;
 
   private ConfigNodeInfo() {
     this.configNodeInfoReadWriteLock = new ReentrantReadWriteLock();
@@ -65,6 +67,15 @@ public class ConfigNodeInfo {
             IoTDBDescriptor.getInstance().getConfig().getSystemDir()
                 + File.separator
                 + PROPERTIES_FILE_NAME);
+    propertiesFileTmp =
+        SystemFileFactory.INSTANCE.getFile(propertiesFile.getAbsolutePath() + 
".tmp");
+    if (propertiesFileTmp.exists()) {
+      try {
+        updatePropertiesFile();
+      } catch (IOException e) {
+        logger.error("Update properties file fail", e);
+      }
+    }
   }
   // TODO: This needs removal of statics ...
   public static void reinitializeStatics() {
@@ -72,14 +83,14 @@ public class ConfigNodeInfo {
   }
 
   /** Update ConfigNodeList both in memory and system.properties file */
-  public void updateConfigNodeList(List<TEndPoint> latestConfigNodes) {
+  public boolean updateConfigNodeList(List<TEndPoint> latestConfigNodes) {
     long startTime = System.currentTimeMillis();
     // Check whether the config nodes are latest or not
     configNodeInfoReadWriteLock.readLock().lock();
     try {
-      if (onlineConfigNodes.containsAll(latestConfigNodes)
-          && new HashSet<>(latestConfigNodes).containsAll(onlineConfigNodes)) {
-        return;
+      if (onlineConfigNodes.size() == latestConfigNodes.size()
+          && onlineConfigNodes.containsAll(latestConfigNodes)) {
+        return true;
       }
     } finally {
       configNodeInfoReadWriteLock.readLock().unlock();
@@ -98,9 +109,11 @@ public class ConfigNodeInfo {
           (endTime - startTime));
     } catch (IOException e) {
       logger.error("Update ConfigNode failed.", e);
+      return false;
     } finally {
       configNodeInfoReadWriteLock.writeLock().unlock();
     }
+    return true;
   }
 
   /**
@@ -115,9 +128,20 @@ public class ConfigNodeInfo {
     }
     properties.setProperty(
         CONFIG_NODE_LIST, NodeUrlUtils.convertTEndPointUrls(new 
ArrayList<>(onlineConfigNodes)));
-    try (FileOutputStream fileOutputStream = new 
FileOutputStream(propertiesFile)) {
+    try (FileOutputStream fileOutputStream = new 
FileOutputStream(propertiesFileTmp)) {
       properties.store(fileOutputStream, "");
     }
+    updatePropertiesFile();
+  }
+
+  private void updatePropertiesFile() throws IOException {
+    if (!propertiesFile.delete()) {
+      String msg =
+          String.format(
+              "Update %s file fail: %s", PROPERTIES_FILE_NAME, 
propertiesFile.getAbsoluteFile());
+      throw new IOException(msg);
+    }
+    FileUtils.moveFileSafe(propertiesFileTmp, propertiesFile);
   }
 
   public void loadConfigNodeList() {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
index 777a6298106..d2e57c6bbb6 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
@@ -19,7 +19,6 @@
 
 package org.apache.iotdb.db.protocol.thrift.impl;
 
-import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
 import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
 import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
@@ -212,7 +211,6 @@ import 
org.apache.iotdb.mpp.rpc.thrift.TSendFragmentInstanceReq;
 import org.apache.iotdb.mpp.rpc.thrift.TSendFragmentInstanceResp;
 import org.apache.iotdb.mpp.rpc.thrift.TSendSinglePlanNodeResp;
 import org.apache.iotdb.mpp.rpc.thrift.TTsFilePieceReq;
-import org.apache.iotdb.mpp.rpc.thrift.TUpdateConfigNodeGroupReq;
 import org.apache.iotdb.mpp.rpc.thrift.TUpdateTemplateReq;
 import org.apache.iotdb.mpp.rpc.thrift.TUpdateTriggerLocationReq;
 import org.apache.iotdb.rpc.RpcUtils;
@@ -1310,6 +1308,13 @@ public class DataNodeInternalRPCServiceImpl implements 
IDataNodeRPCService.Iface
       PipeAgent.task().collectPipeMetaList(resp);
     }
 
+    if (req.isSetConfigNodeEndPoints()) {
+      if (ConfigNodeInfo.getInstance()
+          .updateConfigNodeList(new 
ArrayList<>(req.getConfigNodeEndPoints()))) {
+        resp.setConfirmedConfigNodeEndPoints(req.getConfigNodeEndPoints());
+      }
+    }
+
     return resp;
   }
 
@@ -1542,20 +1547,6 @@ public class DataNodeInternalRPCServiceImpl implements 
IDataNodeRPCService.Iface
     return storageEngine.setTTL(req);
   }
 
-  @Override
-  public TSStatus updateConfigNodeGroup(TUpdateConfigNodeGroupReq req) {
-    List<TConfigNodeLocation> configNodeLocations = 
req.getConfigNodeLocations();
-    if (configNodeLocations != null) {
-      ConfigNodeInfo.getInstance()
-          .updateConfigNodeList(
-              configNodeLocations
-                  .parallelStream()
-                  .map(TConfigNodeLocation::getInternalEndPoint)
-                  .collect(Collectors.toList()));
-    }
-    return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
-  }
-
   @Override
   public TSStatus updateTemplate(TUpdateTemplateReq req) {
     switch (TemplateInternalRPCUpdateType.getType(req.type)) {
diff --git a/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift 
b/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift
index 1d818fad3c0..ddc14327a16 100644
--- a/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift
+++ b/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift
@@ -252,6 +252,7 @@ struct TDataNodeHeartbeatReq {
   8: optional bool needPipeMetaList
   9: optional i64 deviceQuotaRemain
   10: optional TDataNodeActivation activation
+  11: optional set<common.TEndPoint> configNodeEndPoints
 }
 
 struct TDataNodeActivation {
@@ -273,6 +274,7 @@ struct TDataNodeHeartbeatResp {
   9: optional TSchemaLimitLevel schemaLimitLevel
   10: optional list<binary> pipeMetaList
   11: optional string activateStatus
+  12: optional set<common.TEndPoint> confirmedConfigNodeEndPoints
 }
 
 struct TPipeHeartbeatReq {
@@ -305,10 +307,6 @@ struct TRegionRouteReq {
   2: required map<common.TConsensusGroupId, common.TRegionReplicaSet> 
regionRouteMap
 }
 
-struct TUpdateConfigNodeGroupReq {
-  1: required list<common.TConfigNodeLocation> configNodeLocations
-}
-
 struct TUpdateTemplateReq {
   1: required byte type
   2: required binary templateInfo
@@ -803,13 +801,6 @@ service IDataNodeRPCService {
    * Config node will Set the TTL for the database on a list of data nodes.
    */
   common.TSStatus setTTL(common.TSetTTLReq req)
-  
-  /**
-   * configNode will notify all DataNodes when the capacity of the 
ConfigNodeGroup is expanded or reduced
-   *
-   * @param list<common.TConfigNodeLocation> configNodeLocations
-   */
-  common.TSStatus updateConfigNodeGroup(TUpdateConfigNodeGroupReq req)
 
   /**
    * Update template cache when template info or template set info is updated

Reply via email to