This is an automated email from the ASF dual-hosted git repository.

CRZbulabula pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new d4be5c82e0f Improve TopologyService and HeartbeatService scalability 
for large clusters (#17595)
d4be5c82e0f is described below

commit d4be5c82e0f8ba08b65484d529998a52591a7a5b
Author: Yongzao <[email protected]>
AuthorDate: Fri May 8 15:56:11 2026 +0800

    Improve TopologyService and HeartbeatService scalability for large clusters 
(#17595)
---
 .../it/env/cluster/config/MppCommonConfig.java     |   6 +
 .../env/cluster/config/MppSharedCommonConfig.java  |   7 +
 .../it/env/remote/config/RemoteCommonConfig.java   |   5 +
 .../org/apache/iotdb/itbase/env/CommonConfig.java  |   2 +
 ...ableAggregationQueryWithNetworkPartitionIT.java |   3 +-
 .../client/async/CnToDnAsyncRequestType.java       |   1 +
 .../CnToDnInternalServiceAsyncRequestManager.java  |   6 +
 .../rpc/DataNodeAsyncRequestRPCHandler.java        |   1 +
 .../iotdb/confignode/conf/ConfigNodeConfig.java    |  33 +++
 .../confignode/conf/ConfigNodeDescriptor.java      |  31 +++
 .../statemachine/ConfigRegionStateMachine.java     |   5 +
 .../iotdb/confignode/manager/ConfigManager.java    |  14 ++
 .../iotdb/confignode/manager/load/LoadManager.java |   8 +-
 .../confignode/manager/load/cache/LoadCache.java   |   2 +-
 .../manager/load/service/HeartbeatService.java     |   8 -
 .../manager/load/service/TopologyService.java      | 250 ++++++++++++++-------
 .../impl/DataNodeInternalRPCServiceImpl.java       |  61 ++++-
 .../iotdb/db/queryengine/plan/ClusterTopology.java | 109 +++++----
 .../conf/iotdb-system.properties.template          |  15 ++
 .../iotdb/commons/concurrent/ThreadName.java       |   1 +
 .../src/main/thrift/datanode.thrift                |  11 +
 21 files changed, 422 insertions(+), 157 deletions(-)

diff --git 
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppCommonConfig.java
 
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppCommonConfig.java
index 5d1ba8fc805..1578e8a3fbc 100644
--- 
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppCommonConfig.java
+++ 
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppCommonConfig.java
@@ -664,6 +664,12 @@ public class MppCommonConfig extends MppBaseConfig 
implements CommonConfig {
     return this;
   }
 
+  @Override
+  public CommonConfig setEnableTopologyProbing(boolean enableTopologyProbing) {
+    setProperty("enable_topology_probing", 
String.valueOf(enableTopologyProbing));
+    return this;
+  }
+
   // For part of the log directory
   public String getClusterConfigStr() {
     return 
fromConsensusFullNameToAbbr(properties.getProperty(CONFIG_NODE_CONSENSUS_PROTOCOL_CLASS))
diff --git 
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppSharedCommonConfig.java
 
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppSharedCommonConfig.java
index 29262c3e06b..c7d8becb373 100644
--- 
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppSharedCommonConfig.java
+++ 
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppSharedCommonConfig.java
@@ -699,4 +699,11 @@ public class MppSharedCommonConfig implements CommonConfig 
{
     cnConfig.setMaxRowsInCteBuffer(maxRows);
     return this;
   }
+
+  @Override
+  public CommonConfig setEnableTopologyProbing(boolean enableTopologyProbing) {
+    dnConfig.setEnableTopologyProbing(enableTopologyProbing);
+    cnConfig.setEnableTopologyProbing(enableTopologyProbing);
+    return this;
+  }
 }
diff --git 
a/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteCommonConfig.java
 
b/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteCommonConfig.java
index af48002c3a1..4e0b0d2d727 100644
--- 
a/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteCommonConfig.java
+++ 
b/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteCommonConfig.java
@@ -487,4 +487,9 @@ public class RemoteCommonConfig implements CommonConfig {
   public CommonConfig setMaxRowsInCteBuffer(int maxRows) {
     return this;
   }
+
+  @Override
+  public CommonConfig setEnableTopologyProbing(boolean enableTopologyProbing) {
+    return this;
+  }
 }
diff --git 
a/integration-test/src/main/java/org/apache/iotdb/itbase/env/CommonConfig.java 
b/integration-test/src/main/java/org/apache/iotdb/itbase/env/CommonConfig.java
index 09fb5545ebd..fb4c4f56ba5 100644
--- 
a/integration-test/src/main/java/org/apache/iotdb/itbase/env/CommonConfig.java
+++ 
b/integration-test/src/main/java/org/apache/iotdb/itbase/env/CommonConfig.java
@@ -215,4 +215,6 @@ public interface CommonConfig {
   CommonConfig setCteBufferSize(long cteBufferSize);
 
   CommonConfig setMaxRowsInCteBuffer(int maxRows);
+
+  CommonConfig setEnableTopologyProbing(boolean enableTopologyProbing);
 }
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/relational/it/query/recent/IoTDBTableAggregationQueryWithNetworkPartitionIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/relational/it/query/recent/IoTDBTableAggregationQueryWithNetworkPartitionIT.java
index cba551ee74a..bb00451ea8f 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/relational/it/query/recent/IoTDBTableAggregationQueryWithNetworkPartitionIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/relational/it/query/recent/IoTDBTableAggregationQueryWithNetworkPartitionIT.java
@@ -76,7 +76,8 @@ public class IoTDBTableAggregationQueryWithNetworkPartitionIT 
{
         .setSchemaReplicationFactor(testReplicationFactor)
         .setDataReplicationFactor(testReplicationFactor)
         .setTimePartitionInterval(testTimePartitionInterval)
-        
.setDefaultDataRegionGroupNumPerDatabase(testDataRegionGroupPerDatabase);
+        
.setDefaultDataRegionGroupNumPerDatabase(testDataRegionGroupPerDatabase)
+        .setEnableTopologyProbing(true);
     EnvFactory.getEnv().initClusterEnvironment(1, 3);
     prepareTableData(createSqls);
   }
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/CnToDnAsyncRequestType.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/CnToDnAsyncRequestType.java
index e5753bf1bd1..e592ef8e5cb 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/CnToDnAsyncRequestType.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/CnToDnAsyncRequestType.java
@@ -34,6 +34,7 @@ public enum CnToDnAsyncRequestType {
   SUBMIT_TEST_CONNECTION_TASK,
   SUBMIT_TEST_DN_INTERNAL_CONNECTION_TASK,
   TEST_CONNECTION,
+  PUSH_TOPOLOGY,
 
   // Region Maintenance
   CREATE_DATA_REGION,
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/CnToDnInternalServiceAsyncRequestManager.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/CnToDnInternalServiceAsyncRequestManager.java
index 8c7b389bd3d..cc4a6134eb1 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/CnToDnInternalServiceAsyncRequestManager.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/CnToDnInternalServiceAsyncRequestManager.java
@@ -101,6 +101,7 @@ import 
org.apache.iotdb.mpp.rpc.thrift.TRollbackViewSchemaBlackListReq;
 import 
org.apache.iotdb.mpp.rpc.thrift.TTableDeviceDeletionWithPatternAndFilterReq;
 import org.apache.iotdb.mpp.rpc.thrift.TTableDeviceDeletionWithPatternOrModReq;
 import org.apache.iotdb.mpp.rpc.thrift.TTableDeviceInvalidateCacheReq;
+import org.apache.iotdb.mpp.rpc.thrift.TUpdateClusterTopologyReq;
 import org.apache.iotdb.mpp.rpc.thrift.TUpdateTableReq;
 import org.apache.iotdb.mpp.rpc.thrift.TUpdateTemplateReq;
 import org.apache.iotdb.mpp.rpc.thrift.TUpdateTriggerLocationReq;
@@ -412,6 +413,11 @@ public class CnToDnInternalServiceAsyncRequestManager
         CnToDnAsyncRequestType.TEST_CONNECTION,
         (req, client, handler) ->
             client.testConnectionEmptyRPC((DataNodeTSStatusRPCHandler) 
handler));
+    actionMapBuilder.put(
+        CnToDnAsyncRequestType.PUSH_TOPOLOGY,
+        (req, client, handler) ->
+            client.updateClusterTopology(
+                (TUpdateClusterTopologyReq) req, (DataNodeTSStatusRPCHandler) 
handler));
     actionMapBuilder.put(
         CnToDnAsyncRequestType.INSERT_RECORD,
         (req, client, handler) ->
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/DataNodeAsyncRequestRPCHandler.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/DataNodeAsyncRequestRPCHandler.java
index b2e2ec32327..ae651d1d9fc 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/DataNodeAsyncRequestRPCHandler.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/DataNodeAsyncRequestRPCHandler.java
@@ -202,6 +202,7 @@ public abstract class 
DataNodeAsyncRequestRPCHandler<Response>
             dataNodeLocationMap,
             (Map<Integer, TExternalServiceListResp>) responseMap,
             countDownLatch);
+      case PUSH_TOPOLOGY:
       case SET_TTL:
       case CREATE_DATA_REGION:
       case CREATE_SCHEMA_REGION:
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java
index 64167ba1d2e..22a337dde0f 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java
@@ -215,6 +215,15 @@ public class ConfigNodeConfig {
   /** Acceptable pause duration for Phi accrual failure detector */
   private long failureDetectorPhiAcceptablePauseInMs = 10000;
 
+  /** Whether to enable topology probing between DataNodes. Supports 
hot-reload. */
+  private volatile boolean enableTopologyProbing = false;
+
+  /** Base interval in ms for topology probing. */
+  private long topologyProbingBaseIntervalInMs = 5000;
+
+  /** Ratio of probing timeout to probing interval (must be less than 1.0). */
+  private double topologyProbingTimeoutRatio = 0.5;
+
   /** The policy of cluster RegionGroups' leader distribution. */
   private String leaderDistributionPolicy = AbstractLeaderBalancer.CFD_POLICY;
 
@@ -1316,4 +1325,28 @@ public class ConfigNodeConfig {
   public void setFailureDetectorPhiAcceptablePauseInMs(long 
failureDetectorPhiAcceptablePauseInMs) {
     this.failureDetectorPhiAcceptablePauseInMs = 
failureDetectorPhiAcceptablePauseInMs;
   }
+
+  public boolean isEnableTopologyProbing() {
+    return enableTopologyProbing;
+  }
+
+  public void setEnableTopologyProbing(boolean enableTopologyProbing) {
+    this.enableTopologyProbing = enableTopologyProbing;
+  }
+
+  public long getTopologyProbingBaseIntervalInMs() {
+    return topologyProbingBaseIntervalInMs;
+  }
+
+  public void setTopologyProbingBaseIntervalInMs(long 
topologyProbingBaseIntervalInMs) {
+    this.topologyProbingBaseIntervalInMs = topologyProbingBaseIntervalInMs;
+  }
+
+  public double getTopologyProbingTimeoutRatio() {
+    return topologyProbingTimeoutRatio;
+  }
+
+  public void setTopologyProbingTimeoutRatio(double 
topologyProbingTimeoutRatio) {
+    this.topologyProbingTimeoutRatio = topologyProbingTimeoutRatio;
+  }
 }
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
index fa35565ff51..bce0d4b215a 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
@@ -340,6 +340,35 @@ public class ConfigNodeDescriptor {
                 "failure_detector_phi_acceptable_pause_in_ms",
                 
String.valueOf(conf.getFailureDetectorPhiAcceptablePauseInMs()))));
 
+    conf.setEnableTopologyProbing(
+        Boolean.parseBoolean(
+            properties.getProperty(
+                "enable_topology_probing", 
String.valueOf(conf.isEnableTopologyProbing()))));
+
+    long topologyProbingBaseIntervalInMs =
+        Long.parseLong(
+            properties.getProperty(
+                "topology_probing_base_interval_in_ms",
+                String.valueOf(conf.getTopologyProbingBaseIntervalInMs())));
+    if (topologyProbingBaseIntervalInMs <= 0) {
+      throw new IOException(
+          "topology_probing_base_interval_in_ms must be positive, but got: "
+              + topologyProbingBaseIntervalInMs);
+    }
+    conf.setTopologyProbingBaseIntervalInMs(topologyProbingBaseIntervalInMs);
+
+    double topologyProbingTimeoutRatio =
+        Double.parseDouble(
+            properties.getProperty(
+                "topology_probing_timeout_ratio",
+                String.valueOf(conf.getTopologyProbingTimeoutRatio())));
+    if (topologyProbingTimeoutRatio <= 0 || topologyProbingTimeoutRatio >= 
1.0) {
+      throw new IOException(
+          "topology_probing_timeout_ratio must be in (0, 1), but got: "
+              + topologyProbingTimeoutRatio);
+    }
+    conf.setTopologyProbingTimeoutRatio(topologyProbingTimeoutRatio);
+
     String leaderDistributionPolicy =
         properties.getProperty("leader_distribution_policy", 
conf.getLeaderDistributionPolicy());
     if (AbstractLeaderBalancer.GREEDY_POLICY.equals(leaderDistributionPolicy)
@@ -791,6 +820,8 @@ public class ConfigNodeDescriptor {
     ConfigurationFileUtils.updateAppliedProperties(properties, true);
     Optional.ofNullable(properties.getProperty(IoTDBConstant.CLUSTER_NAME))
         .ifPresent(conf::setClusterName);
+    Optional.ofNullable(properties.getProperty("enable_topology_probing"))
+        .ifPresent(v -> 
conf.setEnableTopologyProbing(Boolean.parseBoolean(v)));
   }
 
   public static ConfigNodeDescriptor getInstance() {
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachine.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachine.java
index effb1c466ba..9c0f64873c4 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachine.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachine.java
@@ -256,6 +256,7 @@ public class ConfigRegionStateMachine implements 
IStateMachine, IStateMachine.Ev
     
configManager.getPipeManager().getPipeRuntimeCoordinator().stopPipeMetaSync();
     
configManager.getPipeManager().getPipeRuntimeCoordinator().stopPipeHeartbeat();
     
configManager.getSubscriptionManager().getSubscriptionCoordinator().stopSubscriptionMetaSync();
+    configManager.getLoadManager().stopTopologyService();
     configManager.getLoadManager().stopLoadServices();
     configManager.getProcedureManager().stopExecutor();
     configManager.getRetryFailedTasksThread().stopRetryFailedTasksService();
@@ -288,6 +289,10 @@ public class ConfigRegionStateMachine implements 
IStateMachine, IStateMachine.Ev
     // Always start load services first
     configManager.getLoadManager().startLoadServices();
 
+    if (CONF.isEnableTopologyProbing()) {
+      configManager.getLoadManager().startTopologyService();
+    }
+
     // Start leader scheduling services
     configManager.getProcedureManager().startExecutor();
     threadPool.submit(
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
index 327f842e966..34533362d44 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
@@ -1744,6 +1744,7 @@ public class ConfigManager implements IManager {
       TrimProperties properties = new TrimProperties();
       properties.putAll(req.getConfigs());
 
+      boolean wasTopologyProbingEnabled = CONF.isEnableTopologyProbing();
       if (configurationFileFound) {
         File file = new File(url.getFile());
         try {
@@ -1767,6 +1768,7 @@ public class ConfigManager implements IManager {
         }
         LOGGER.warn(msg);
       }
+      handleTopologyProbingHotReload(wasTopologyProbingEnabled);
       if (currentNodeId == req.getNodeId()) {
         return tsStatus;
       }
@@ -1778,6 +1780,18 @@ public class ConfigManager implements IManager {
     return RpcUtils.squashResponseStatusList(statusList);
   }
 
+  private void handleTopologyProbingHotReload(boolean wasEnabled) {
+    boolean isEnabled = CONF.isEnableTopologyProbing();
+    if (wasEnabled == isEnabled) {
+      return;
+    }
+    if (isEnabled && getConsensusManager().isLeader()) {
+      getLoadManager().startTopologyService();
+    } else if (!isEnabled) {
+      getLoadManager().stopTopologyService();
+    }
+  }
+
   @Override
   public TSStatus startRepairData() {
     TSStatus status = confirmLeader();
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
index 993bfc0e400..e97f32bdbda 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
@@ -152,7 +152,6 @@ public class LoadManager {
     statisticsService.startLoadStatisticsService();
     eventService.startEventService();
     partitionBalancer.setupPartitionBalancer();
-    topologyService.startTopologyService();
   }
 
   public void stopLoadServices() {
@@ -162,6 +161,13 @@ public class LoadManager {
     loadCache.clearHeartbeatCache();
     partitionBalancer.clearPartitionBalancer();
     routeBalancer.clearRegionPriority();
+  }
+
+  public void startTopologyService() {
+    topologyService.startTopologyService();
+  }
+
+  public void stopTopologyService() {
     topologyService.stopTopologyService();
   }
 
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/LoadCache.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/LoadCache.java
index 07105d96bbd..41a0dbc5c46 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/LoadCache.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/LoadCache.java
@@ -787,7 +787,7 @@ public class LoadCache {
       for (int fromId : latestTopology.keySet()) {
         for (int toId : latestTopology.keySet()) {
           boolean originReachable =
-              latestTopology.getOrDefault(fromId, 
Collections.emptySet()).contains(toId);
+              topologyGraph.getOrDefault(fromId, 
Collections.emptySet()).contains(toId);
           boolean newReachable =
               latestTopology.getOrDefault(fromId, 
Collections.emptySet()).contains(toId);
           if (originReachable != newReachable) {
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/HeartbeatService.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/HeartbeatService.java
index 64322da5bbb..100b0cbf384 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/HeartbeatService.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/HeartbeatService.java
@@ -49,7 +49,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.List;
-import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.Future;
@@ -168,13 +167,6 @@ public class HeartbeatService {
       
heartbeatReq.setSpaceQuotaUsage(configManager.getClusterQuotaManager().getSpaceQuotaUsage());
     }
 
-    final Map<Integer, Set<Integer>> topologyMap =
-        configManager.getLoadManager().getLoadCache().getTopology();
-    if (topologyMap != null) {
-      heartbeatReq.setTopology(topologyMap);
-      
heartbeatReq.setDataNodes(configManager.getNodeManager().getRegisteredDataNodeLocations());
-    }
-
     // We broadcast region operations list every 100 heartbeat loops
     if (heartbeatCounter.get() % 100 == 0) {
       heartbeatReq.setCurrentRegionOperations(
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/TopologyService.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/TopologyService.java
index 9e4f6bd6121..728c2ce7ffc 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/TopologyService.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/TopologyService.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.confignode.manager.load.service;
 import org.apache.iotdb.common.rpc.thrift.TDataNodeConfiguration;
 import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
 import org.apache.iotdb.common.rpc.thrift.TNodeLocations;
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.common.rpc.thrift.TTestConnectionResp;
 import org.apache.iotdb.common.rpc.thrift.TTestConnectionResult;
 import org.apache.iotdb.commons.cluster.NodeStatus;
@@ -41,6 +42,7 @@ import 
org.apache.iotdb.confignode.manager.load.cache.node.NodeHeartbeatSample;
 import org.apache.iotdb.confignode.manager.load.cache.node.NodeStatistics;
 import 
org.apache.iotdb.confignode.manager.load.subscriber.IClusterStatusSubscriber;
 import 
org.apache.iotdb.confignode.manager.load.subscriber.NodeStatisticsChangeEvent;
+import org.apache.iotdb.mpp.rpc.thrift.TUpdateClusterTopologyReq;
 
 import org.apache.ratis.util.AwaitForSignal;
 import org.apache.tsfile.utils.Pair;
@@ -49,6 +51,8 @@ import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.LinkedList;
 import java.util.List;
@@ -57,7 +61,6 @@ import java.util.Objects;
 import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
@@ -67,8 +70,6 @@ import java.util.stream.Collectors;
 
 public class TopologyService implements Runnable, IClusterStatusSubscriber {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(TopologyService.class);
-  private static final long PROBING_INTERVAL_MS = 5_000L;
-  private static final long PROBING_TIMEOUT_MS = PROBING_INTERVAL_MS;
   private static final int SAMPLING_WINDOW_SIZE = 100;
 
   private final ExecutorService topologyThread =
@@ -85,11 +86,18 @@ public class TopologyService implements Runnable, 
IClusterStatusSubscriber {
 
   /* (fromDataNodeId, toDataNodeId) -> heartbeat history */
   private final Map<Pair<Integer, Integer>, List<AbstractHeartbeatSample>> 
heartbeats;
-  private final List<Integer> startingDataNodes = new CopyOnWriteArrayList<>();
 
   private final IFailureDetector failureDetector;
   private static final ConfigNodeConfig CONF = 
ConfigNodeDescriptor.getInstance().getConf();
 
+  private int proberRotationIndex = 0;
+
+  /** Last topology pushed to each DataNode, updated only on successful push. 
*/
+  private final Map<Integer, Set<Integer>> lastPushedTopology = new 
ConcurrentHashMap<>();
+
+  /** Latest computed topology, updated each probing round. */
+  private final Map<Integer, Set<Integer>> latestTopology = new 
ConcurrentHashMap<>();
+
   public TopologyService(
       IManager configManager, Consumer<Map<Integer, Set<Integer>>> 
topologyChangeListener) {
     this.configManager = configManager;
@@ -98,7 +106,6 @@ public class TopologyService implements Runnable, 
IClusterStatusSubscriber {
     this.shouldRun = new AtomicBoolean(false);
     this.awaitForSignal = new AwaitForSignal(this.getClass().getSimpleName());
 
-    // here we use the same failure
     switch (CONF.getFailureDetector()) {
       case IFailureDetector.PHI_ACCRUAL_DETECTOR:
         this.failureDetector =
@@ -126,22 +133,20 @@ public class TopologyService implements Runnable, 
IClusterStatusSubscriber {
 
   public synchronized void stopTopologyService() {
     shouldRun.set(false);
-    future.cancel(true);
-    future = null;
+    if (future != null) {
+      future.cancel(true);
+      future = null;
+    }
     heartbeats.clear();
+    latestTopology.clear();
     LOGGER.info("Topology Probing has stopped successfully");
   }
 
-  /**
-   * Schedule the {@link #topologyProbing} task either: 1. every 
PROBING_INTERVAL_MS interval. 2.
-   * Manually triggered by outside events (node restart / register, etc.).
-   */
   private boolean mayWait() {
     try {
-      this.awaitForSignal.await(PROBING_INTERVAL_MS, TimeUnit.MILLISECONDS);
+      this.awaitForSignal.await(CONF.getTopologyProbingBaseIntervalInMs(), 
TimeUnit.MILLISECONDS);
       return true;
     } catch (InterruptedException e) {
-      // we don't reset the interrupt flag here since we may reuse this thread 
again.
       return false;
     }
   }
@@ -153,36 +158,70 @@ public class TopologyService implements Runnable, 
IClusterStatusSubscriber {
     }
   }
 
+  /**
+   * Select sqrt(N) DataNodes as probers, rotating through all DataNodes 
across cycles so that every
+   * DataNode gets to be a prober over sqrt(N) cycles.
+   */
+  private List<TDataNodeLocation> selectProbers(List<TDataNodeLocation> 
allDataNodes) {
+    int n = allDataNodes.size();
+    if (n <= 1) {
+      return allDataNodes;
+    }
+    int sqrtN = (int) Math.ceil(Math.sqrt(n));
+    List<TDataNodeLocation> sorted = new ArrayList<>(allDataNodes);
+    sorted.sort(Comparator.comparingInt(TDataNodeLocation::getDataNodeId));
+    int startIndex = (proberRotationIndex * sqrtN) % n;
+    proberRotationIndex++;
+    List<TDataNodeLocation> probers = new ArrayList<>(sqrtN);
+    for (int i = 0; i < sqrtN && i < n; i++) {
+      probers.add(sorted.get((startIndex + i) % n));
+    }
+    return probers;
+  }
+
   private synchronized void topologyProbing() {
-    // 1. get the latest datanode list
+    // 1. get Running DataNodes only
     final List<TDataNodeLocation> dataNodeLocations = new ArrayList<>();
     final Set<Integer> dataNodeIds = new HashSet<>();
     for (final TDataNodeConfiguration dataNodeConf :
         configManager.getNodeManager().getRegisteredDataNodes()) {
       final TDataNodeLocation location = dataNodeConf.getLocation();
-      if (startingDataNodes.contains(location.getDataNodeId())) {
-        continue; // we shall wait for internal endpoint to be ready
+      if 
(configManager.getLoadManager().getNodeStatus(location.getDataNodeId())
+          != NodeStatus.Running) {
+        continue;
       }
       dataNodeLocations.add(location);
       dataNodeIds.add(location.getDataNodeId());
     }
 
-    // 2. send the verify connection RPC to all datanodes
+    // 2. compute probing timeout
+    final long timeout =
+        (long) (CONF.getTopologyProbingBaseIntervalInMs() * 
CONF.getTopologyProbingTimeoutRatio());
+
+    // 3. select sqrt(N) probers via rotating selection
+    final List<TDataNodeLocation> probers = selectProbers(dataNodeLocations);
+    final Set<Integer> proberIds =
+        
probers.stream().map(TDataNodeLocation::getDataNodeId).collect(Collectors.toSet());
+
+    // 4. build TNodeLocations with ALL DataNode locations (so probers test 
all targets)
     final TNodeLocations nodeLocations = new TNodeLocations();
     nodeLocations.setDataNodeLocations(dataNodeLocations);
     nodeLocations.setConfigNodeLocations(Collections.emptyList());
-    final Map<Integer, TDataNodeLocation> dataNodeLocationMap =
-        configManager.getNodeManager().getRegisteredDataNodes().stream()
-            .map(TDataNodeConfiguration::getLocation)
+
+    // 5. build proberLocationMap containing only the selected probers
+    final Map<Integer, TDataNodeLocation> proberLocationMap =
+        probers.stream()
             .collect(Collectors.toMap(TDataNodeLocation::getDataNodeId, 
location -> location));
+
+    // 6. send async requests ONLY to probers with computed timeout
     final DataNodeAsyncRequestContext<TNodeLocations, TTestConnectionResp>
         dataNodeAsyncRequestContext =
             new DataNodeAsyncRequestContext<>(
                 CnToDnAsyncRequestType.SUBMIT_TEST_DN_INTERNAL_CONNECTION_TASK,
                 nodeLocations,
-                dataNodeLocationMap);
+                proberLocationMap);
     CnToDnInternalServiceAsyncRequestManager.getInstance()
-        .sendAsyncRequestWithTimeoutInMs(dataNodeAsyncRequestContext, 
PROBING_TIMEOUT_MS);
+        .sendAsyncRequestWithTimeoutInMs(dataNodeAsyncRequestContext, timeout);
     final List<TTestConnectionResult> results = new ArrayList<>();
     dataNodeAsyncRequestContext
         .getResponseMap()
@@ -193,7 +232,7 @@ public class TopologyService implements Runnable, 
IClusterStatusSubscriber {
               }
             });
 
-    // 3. collect results and update the heartbeat timestamps
+    // 7. collect results and update the heartbeat timestamps
     for (final TTestConnectionResult result : results) {
       final int fromDataNodeId =
           Optional.ofNullable(result.getSender().getDataNodeLocation())
@@ -203,8 +242,6 @@ public class TopologyService implements Runnable, 
IClusterStatusSubscriber {
       if (result.isSuccess()
           && dataNodeIds.contains(fromDataNodeId)
           && dataNodeIds.contains(toDataNodeId)) {
-        // testAllDataNodeConnectionWithTimeout ensures the heartbeats are 
Dn-Dn internally. Here we
-        // just double-check.
         final List<AbstractHeartbeatSample> heartbeatHistory =
             heartbeats.computeIfAbsent(
                 new Pair<>(fromDataNodeId, toDataNodeId), p -> new 
LinkedList<>());
@@ -215,40 +252,128 @@ public class TopologyService implements Runnable, 
IClusterStatusSubscriber {
       }
     }
 
-    // 4. use failure detector to identify potential network partitions
-    final Map<Integer, Set<Integer>> latestTopology =
-        dataNodeLocations.stream()
-            .collect(Collectors.toMap(TDataNodeLocation::getDataNodeId, k -> 
new HashSet<>()));
+    // 8. build topology: for non-probers carry forward previous results 
(default all-connected),
+    //    for probers run failure detector
+    final Map<Integer, Set<Integer>> computedTopology = new HashMap<>();
+    for (int nodeId : dataNodeIds) {
+      if (proberIds.contains(nodeId)) {
+        computedTopology.put(nodeId, new HashSet<>());
+      } else {
+        Set<Integer> prev = latestTopology.get(nodeId);
+        if (prev != null) {
+          Set<Integer> carried = new HashSet<>(prev);
+          carried.retainAll(dataNodeIds);
+          computedTopology.put(nodeId, carried);
+        } else {
+          computedTopology.put(nodeId, new HashSet<>(dataNodeIds));
+        }
+      }
+    }
+
     for (final Map.Entry<Pair<Integer, Integer>, 
List<AbstractHeartbeatSample>> entry :
         heartbeats.entrySet()) {
       final int fromId = entry.getKey().getLeft();
       final int toId = entry.getKey().getRight();
 
+      if (!proberIds.contains(fromId)
+          || !dataNodeIds.contains(fromId)
+          || !dataNodeIds.contains(toId)) {
+        continue;
+      }
+
       if (!entry.getValue().isEmpty()
           && !failureDetector.isAvailable(entry.getKey(), entry.getValue())) {
         LOGGER.debug("Connection from DataNode {} to DataNode {} is broken", 
fromId, toId);
       } else {
-        Optional.ofNullable(latestTopology.get(fromId)).ifPresent(s -> 
s.add(toId));
+        computedTopology.get(fromId).add(toId);
+      }
+    }
+
+    // For prober nodes: pairs with no heartbeat history default to connected
+    for (int fromId : proberIds) {
+      if (!dataNodeIds.contains(fromId)) {
+        continue;
+      }
+      Set<Integer> reachableSet = computedTopology.get(fromId);
+      for (int toId : dataNodeIds) {
+        if (!heartbeats.containsKey(new Pair<>(fromId, toId))) {
+          reachableSet.add(toId);
+        }
       }
     }
 
-    logAsymmetricPartition(latestTopology);
+    // Save computed topology for next round
+    latestTopology.clear();
+    for (Map.Entry<Integer, Set<Integer>> entry : computedTopology.entrySet()) 
{
+      latestTopology.put(entry.getKey(), new HashSet<>(entry.getValue()));
+    }
+
+    logAsymmetricPartition(computedTopology);
 
-    // 5. notify the listeners on topology change
+    // 9. notify the listeners on topology change
     if (shouldRun.get()) {
-      topologyChangeListener.accept(latestTopology);
+      topologyChangeListener.accept(computedTopology);
     }
+
+    // 10. push topology changes to DataNodes
+    pushTopologyToDataNodes(computedTopology, dataNodeLocations);
   }
 
   /**
-   * We only consider warning (one vs remaining) network partition. If we need 
to cover more
-   * complicated scenarios like (many vs many) network partition, we shall use 
graph algorithms
-   * then.
+   * Push topology changes to DataNodes via PUSH_TOPOLOGY request. Each 
DataNode receives only its
+   * own reachable set. lastPushedTopology is updated only on successful push.
    */
+  private void pushTopologyToDataNodes(
+      Map<Integer, Set<Integer>> computedTopology, List<TDataNodeLocation> 
dataNodeLocations) {
+    final Map<Integer, TDataNodeLocation> dataNodesMap =
+        dataNodeLocations.stream()
+            .collect(Collectors.toMap(TDataNodeLocation::getDataNodeId, loc -> 
loc));
+
+    final Map<Integer, TDataNodeLocation> targetMap = new HashMap<>();
+    for (final TDataNodeLocation location : dataNodeLocations) {
+      final int nodeId = location.getDataNodeId();
+      final Set<Integer> reachableSet =
+          computedTopology.getOrDefault(nodeId, Collections.emptySet());
+      final Set<Integer> lastPushed = lastPushedTopology.get(nodeId);
+      if (lastPushed != null && lastPushed.equals(reachableSet)) {
+        continue;
+      }
+      targetMap.put(nodeId, location);
+    }
+
+    if (targetMap.isEmpty()) {
+      return;
+    }
+
+    final DataNodeAsyncRequestContext<TUpdateClusterTopologyReq, TSStatus> 
context =
+        new 
DataNodeAsyncRequestContext<>(CnToDnAsyncRequestType.PUSH_TOPOLOGY, targetMap);
+    for (final Map.Entry<Integer, TDataNodeLocation> entry : 
targetMap.entrySet()) {
+      final int nodeId = entry.getKey();
+      final Set<Integer> reachableSet =
+          computedTopology.getOrDefault(nodeId, Collections.emptySet());
+      final Map<Integer, Set<Integer>> perNodeTopology = new HashMap<>();
+      perNodeTopology.put(nodeId, new HashSet<>(reachableSet));
+      final TUpdateClusterTopologyReq req =
+          new TUpdateClusterTopologyReq(dataNodesMap, perNodeTopology);
+      context.putRequest(nodeId, req);
+    }
+
+    CnToDnInternalServiceAsyncRequestManager.getInstance()
+        .sendAsyncRequestWithTimeoutInMs(context, 
CONF.getTopologyProbingBaseIntervalInMs());
+
+    context
+        .getResponseMap()
+        .forEach(
+            (nodeId, resp) -> {
+              Set<Integer> reachableSet =
+                  computedTopology.getOrDefault(nodeId, 
Collections.emptySet());
+              lastPushedTopology.put(nodeId, new HashSet<>(reachableSet));
+            });
+  }
+
   private void logAsymmetricPartition(final Map<Integer, Set<Integer>> 
topology) {
     final Set<Integer> nodes = topology.keySet();
     if (nodes.size() == 1) {
-      // 1 DataNode
       return;
     }
 
@@ -258,11 +383,9 @@ public class TopologyService implements Runnable, 
IClusterStatusSubscriber {
           continue;
         }
 
-        // whether we have asymmetric partition [from -> to]
         final Set<Integer> reachableFrom = topology.get(from);
         final Set<Integer> reachableTo = topology.get(to);
         if (reachableFrom.size() <= 1 || reachableTo.size() <= 1) {
-          // symmetric partition for (from) or (to)
           continue;
         }
         if (!reachableTo.contains(from) && !reachableFrom.contains(to)) {
@@ -272,47 +395,22 @@ public class TopologyService implements Runnable, 
IClusterStatusSubscriber {
     }
   }
 
-  /** We only listen to datanode remove / restart / register events */
+  /** Clean up heartbeat and push state when a node is removed or entering 
Removing status. */
   @Override
   public void onNodeStatisticsChanged(NodeStatisticsChangeEvent event) {
-    final Set<Integer> datanodeIds =
-        
configManager.getNodeManager().getRegisteredDataNodeLocations().keySet();
-    final Map<Integer, Pair<NodeStatistics, NodeStatistics>> changes =
-        event.getDifferentNodeStatisticsMap();
     for (final Map.Entry<Integer, Pair<NodeStatistics, NodeStatistics>> entry :
-        changes.entrySet()) {
+        event.getDifferentNodeStatisticsMap().entrySet()) {
       final Integer nodeId = entry.getKey();
-      final Pair<NodeStatistics, NodeStatistics> changeEvent = 
entry.getValue();
-      if (!datanodeIds.contains(nodeId)) {
-        continue;
-      }
-      if (changeEvent.getLeft() == null) {
-        // if a new datanode registered, DO NOT trigger probing immediately
-        startingDataNodes.add(nodeId);
-        continue;
-      } else {
-        startingDataNodes.remove(nodeId);
-      }
-
-      final Set<Pair<Integer, Integer>> affectedPairs =
-          heartbeats.keySet().stream()
-              .filter(
-                  pair ->
-                      Objects.equals(pair.getLeft(), nodeId)
-                          || Objects.equals(pair.getRight(), nodeId))
-              .collect(Collectors.toSet());
-
-      if (changeEvent.getRight() == null) {
-        // datanode removed from cluster, clean up probing history
-        affectedPairs.forEach(heartbeats::remove);
-      } else {
-        // we only trigger probing immediately if node comes around from 
UNKNOWN to RUNNING
-        if (NodeStatus.Unknown.equals(changeEvent.getLeft().getStatus())
-            && NodeStatus.Running.equals(changeEvent.getRight().getStatus())) {
-          // let's clear the history when a new node comes around
-          affectedPairs.forEach(pair -> heartbeats.put(pair, new 
ArrayList<>()));
-          awaitForSignal.signal();
-        }
+      final Pair<NodeStatistics, NodeStatistics> change = entry.getValue();
+      if (change.getRight() == null || 
NodeStatus.Removing.equals(change.getRight().getStatus())) {
+        heartbeats
+            .keySet()
+            .removeIf(
+                pair ->
+                    Objects.equals(pair.getLeft(), nodeId)
+                        || Objects.equals(pair.getRight(), nodeId));
+        lastPushedTopology.remove(nodeId);
+        latestTopology.remove(nodeId);
       }
     }
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
index 84479c9dcd2..436e78906df 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
@@ -52,6 +52,7 @@ import 
org.apache.iotdb.commons.client.request.AsyncRequestContext;
 import org.apache.iotdb.commons.cluster.NodeStatus;
 import org.apache.iotdb.commons.concurrent.Await;
 import org.apache.iotdb.commons.concurrent.AwaitTimeoutException;
+import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
 import org.apache.iotdb.commons.concurrent.IoTThreadFactory;
 import org.apache.iotdb.commons.concurrent.ThreadName;
 import 
org.apache.iotdb.commons.concurrent.threadpool.WrappedThreadPoolExecutor;
@@ -319,6 +320,7 @@ import 
org.apache.iotdb.mpp.rpc.thrift.TTableDeviceDeletionWithPatternAndFilterR
 import org.apache.iotdb.mpp.rpc.thrift.TTableDeviceDeletionWithPatternOrModReq;
 import org.apache.iotdb.mpp.rpc.thrift.TTableDeviceInvalidateCacheReq;
 import org.apache.iotdb.mpp.rpc.thrift.TTsFilePieceReq;
+import org.apache.iotdb.mpp.rpc.thrift.TUpdateClusterTopologyReq;
 import org.apache.iotdb.mpp.rpc.thrift.TUpdateTableReq;
 import org.apache.iotdb.mpp.rpc.thrift.TUpdateTemplateReq;
 import org.apache.iotdb.mpp.rpc.thrift.TUpdateTriggerLocationReq;
@@ -373,6 +375,7 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
@@ -417,6 +420,14 @@ public class DataNodeInternalRPCServiceImpl implements 
IDataNodeRPCService.Iface
 
   private final ClusterTopology clusterTopology = 
ClusterTopology.getInstance();
 
+  private static final long TEST_CONNECTION_TIMEOUT_MS =
+      CommonDescriptor.getInstance().getConfig().getDnConnectionTimeoutInMS();
+
+  private static final ExecutorService TOPOLOGY_PROBING_EXECUTOR =
+      IoTDBThreadPoolFactory.newFixedThreadPool(
+          Math.max(1, Runtime.getRuntime().availableProcessors() / 4),
+          ThreadName.DATANODE_TOPOLOGY_PROBING.getName());
+
   private final CommonConfig commonConfig = 
CommonDescriptor.getInstance().getConfig();
 
   private final DataNodeContext dataNodeContext;
@@ -2076,9 +2087,28 @@ public class DataNodeInternalRPCServiceImpl implements 
IDataNodeRPCService.Iface
   @Override
   public TTestConnectionResp submitInternalTestConnectionTask(TNodeLocations 
nodeLocations)
       throws TException {
-    return new TTestConnectionResp(
-        new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()),
-        
testAllDataNodeConnectionInHeartbeatChannel(nodeLocations.getDataNodeLocations()));
+    Future<TTestConnectionResp> future =
+        TOPOLOGY_PROBING_EXECUTOR.submit(
+            () ->
+                new TTestConnectionResp(
+                    new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()),
+                    testAllDataNodeConnectionInHeartbeatChannel(
+                        nodeLocations.getDataNodeLocations())));
+    try {
+      return future.get(TEST_CONNECTION_TIMEOUT_MS, TimeUnit.MILLISECONDS);
+    } catch (TimeoutException e) {
+      return new TTestConnectionResp(
+          new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode())
+              .setMessage("Topology probing timed out after " + 
TEST_CONNECTION_TIMEOUT_MS + "ms"),
+          Collections.emptyList());
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      throw new TException(e);
+    } catch (Exception e) {
+      throw new TException(e);
+    } finally {
+      future.cancel(true);
+    }
   }
 
   private static <Location, RequestType> List<TTestConnectionResult> 
testConnections(
@@ -2105,7 +2135,8 @@ public class DataNodeInternalRPCServiceImpl implements 
IDataNodeRPCService.Iface
         TServiceType.ConfigNodeInternalService,
         DnToCnRequestType.TEST_CONNECTION,
         (AsyncRequestContext<Object, TSStatus, DnToCnRequestType, 
TConfigNodeLocation> handler) ->
-            
DnToCnInternalServiceAsyncRequestManager.getInstance().sendAsyncRequest(handler));
+            DnToCnInternalServiceAsyncRequestManager.getInstance()
+                .sendAsyncRequestWithTimeoutInMs(handler, 
TEST_CONNECTION_TIMEOUT_MS));
   }
 
   private List<TTestConnectionResult> 
testAllDataNodeConnectionInHeartbeatChannel(
@@ -2117,7 +2148,8 @@ public class DataNodeInternalRPCServiceImpl implements 
IDataNodeRPCService.Iface
         TServiceType.DataNodeInternalService,
         DnToDnRequestType.TEST_CONNECTION,
         (AsyncRequestContext<Object, TSStatus, DnToDnRequestType, 
TDataNodeLocation> handler) ->
-            
DataNodeIntraHeartbeatManager.getInstance().sendAsyncRequest(handler, 1, null, 
true));
+            DataNodeIntraHeartbeatManager.getInstance()
+                .sendAsyncRequest(handler, 1, TEST_CONNECTION_TIMEOUT_MS, 
true));
   }
 
   private List<TTestConnectionResult> testAllDataNodeInternalServiceConnection(
@@ -2129,7 +2161,8 @@ public class DataNodeInternalRPCServiceImpl implements 
IDataNodeRPCService.Iface
         TServiceType.DataNodeInternalService,
         DnToDnRequestType.TEST_CONNECTION,
         (AsyncRequestContext<Object, TSStatus, DnToDnRequestType, 
TDataNodeLocation> handler) ->
-            
DnToDnInternalServiceAsyncRequestManager.getInstance().sendAsyncRequest(handler));
+            DnToDnInternalServiceAsyncRequestManager.getInstance()
+                .sendAsyncRequestWithTimeoutInMs(handler, 
TEST_CONNECTION_TIMEOUT_MS));
   }
 
   private List<TTestConnectionResult> testAllDataNodeMPPServiceConnection(
@@ -2141,7 +2174,8 @@ public class DataNodeInternalRPCServiceImpl implements 
IDataNodeRPCService.Iface
         TServiceType.DataNodeMPPService,
         DnToDnRequestType.TEST_CONNECTION,
         (AsyncRequestContext<Object, TSStatus, DnToDnRequestType, 
TDataNodeLocation> handler) ->
-            
DataNodeMPPServiceAsyncRequestManager.getInstance().sendAsyncRequest(handler));
+            DataNodeMPPServiceAsyncRequestManager.getInstance()
+                .sendAsyncRequestWithTimeoutInMs(handler, 
TEST_CONNECTION_TIMEOUT_MS));
   }
 
   private List<TTestConnectionResult> testAllDataNodeExternalServiceConnection(
@@ -2153,7 +2187,8 @@ public class DataNodeInternalRPCServiceImpl implements 
IDataNodeRPCService.Iface
         TServiceType.DataNodeExternalService,
         DnToDnRequestType.TEST_CONNECTION,
         (AsyncRequestContext<Object, TSStatus, DnToDnRequestType, 
TDataNodeLocation> handler) ->
-            
DataNodeExternalServiceAsyncRequestManager.getInstance().sendAsyncRequest(handler));
+            DataNodeExternalServiceAsyncRequestManager.getInstance()
+                .sendAsyncRequestWithTimeoutInMs(handler, 
TEST_CONNECTION_TIMEOUT_MS));
   }
 
   @Override
@@ -2161,6 +2196,12 @@ public class DataNodeInternalRPCServiceImpl implements 
IDataNodeRPCService.Iface
     return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
   }
 
+  @Override
+  public TSStatus updateClusterTopology(TUpdateClusterTopologyReq req) throws 
TException {
+    clusterTopology.updateTopology(req.getDataNodes(), req.getTopology());
+    return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
+  }
+
   private PathPatternTree filterPathPatternTree(PathPatternTree patternTree, 
String database) {
     PathPatternTree filteredPatternTree = new PathPatternTree();
     try {
@@ -2255,10 +2296,6 @@ public class DataNodeInternalRPCServiceImpl implements 
IDataNodeRPCService.Iface
       }
     }
 
-    if (req.isSetTopology() && req.isSetDataNodes()) {
-      clusterTopology.updateTopology(req.getDataNodes(), req.getTopology());
-    }
-
     if (req.isSetCurrentRegionOperations()) {
       RegionMigrateService.getInstance()
           .notifyRegionMigration(
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/ClusterTopology.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/ClusterTopology.java
index 64ea8b8a2ea..2af51aec26b 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/ClusterTopology.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/ClusterTopology.java
@@ -39,32 +39,43 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.stream.Collectors;
 
+/**
+ * Tracks this DataNode's view of cluster network topology for partition-aware 
query planning. Only
+ * stores the set of DataNodes reachable from this node, pushed by 
ConfigNode's TopologyService.
+ */
 public class ClusterTopology {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(ClusterTopology.class);
   private final Integer myself;
   private final AtomicReference<Map<Integer, TDataNodeLocation>> dataNodes;
-  private final AtomicReference<Map<Integer, Set<Integer>>> topologyMap;
+
+  /** DataNode IDs reachable from this node. Empty means not yet probed by 
TopologyService. */
+  private final AtomicReference<Set<Integer>> myReachableNodes;
+
   private final AtomicBoolean isPartitioned = new AtomicBoolean();
 
   public static ClusterTopology getInstance() {
     return ClusterTopologyHolder.INSTANCE;
   }
 
+  /** Filters a replica set to only include DataNodes reachable from this 
node. */
   public TRegionReplicaSet getValidatedReplicaSet(TRegionReplicaSet origin) {
     if (!isPartitioned.get() || origin == null) {
       return origin;
     }
-    final Set<Integer> reachableToMyself =
-        Collections.unmodifiableSet(topologyMap.get().get(myself));
+    final Set<Integer> reachable = myReachableNodes.get();
+    if (reachable.isEmpty()) {
+      return origin;
+    }
     final List<TDataNodeLocation> locations = new ArrayList<>();
     for (final TDataNodeLocation location : origin.getDataNodeLocations()) {
-      if (reachableToMyself.contains(location.getDataNodeId())) {
+      if (reachable.contains(location.getDataNodeId())) {
         locations.add(location);
       }
     }
     return new TRegionReplicaSet(origin.getRegionId(), locations);
   }
 
+  /** Filters region-to-scan mappings, keeping only replicas reachable from 
this node. */
   public <T> Set<Map.Entry<TRegionReplicaSet, T>> filterReachableCandidates(
       Set<Map.Entry<TRegionReplicaSet, T>> input) {
     if (!isPartitioned.get()) {
@@ -87,6 +98,9 @@ public class ClusterTopology {
       final TRegionReplicaSet replicaSet = newMap.get(gid);
       if (replicaSet != null) {
         candidateMap.put(replicaSet, entry.getValue());
+      } else {
+        // Topology not yet available for this region — return original to 
avoid silent data loss
+        candidateMap.put(entry.getKey(), entry.getValue());
       }
     }
     return candidateMap.entrySet();
@@ -97,82 +111,61 @@ public class ClusterTopology {
       return all;
     }
     for (TRegionReplicaSet replicaSet : all) {
-      // some TRegionReplicaSet is unreachable since all DataNodes are down
       if (replicaSet.getDataNodeLocationsSize() == 0) {
         throw new ReplicaSetUnreachableException(replicaSet);
       }
     }
-    final Map<Integer, Set<Integer>> topologyMapCurrent =
-        Collections.unmodifiableMap(this.topologyMap.get());
-
-    // brute-force search to select DataNode candidates that can communicate 
to all
-    // TRegionReplicaSets
-    final List<Integer> dataNodeCandidates = new ArrayList<>();
-    for (final Integer datanode : topologyMapCurrent.keySet()) {
-      boolean reachableToAllSets = true;
-      final Set<Integer> datanodeReachableToThis = 
topologyMapCurrent.get(datanode);
-      for (final TRegionReplicaSet replicaSet : all) {
-        final List<Integer> replicaNodeLocations =
-            replicaSet.getDataNodeLocations().stream()
-                .map(TDataNodeLocation::getDataNodeId)
-                .collect(Collectors.toList());
-        replicaNodeLocations.retainAll(datanodeReachableToThis);
-        reachableToAllSets = !replicaNodeLocations.isEmpty();
-      }
-      if (reachableToAllSets) {
-        dataNodeCandidates.add(datanode);
-      }
+
+    final Set<Integer> reachable = this.myReachableNodes.get();
+    if (reachable.isEmpty()) {
+      return all;
     }
+    final Map<Integer, TDataNodeLocation> dataNodesCurrent = 
this.dataNodes.get();
 
-    // select TRegionReplicaSet candidates whose DataNode Locations contain at 
least one
-    // allReachableDataNodes
     final List<TRegionReplicaSet> reachableSetCandidates = new ArrayList<>();
     for (final TRegionReplicaSet replicaSet : all) {
-      final List<Integer> commonLocations =
+      final List<TDataNodeLocation> validLocations =
           replicaSet.getDataNodeLocations().stream()
-              .map(TDataNodeLocation::getDataNodeId)
+              .filter(loc -> reachable.contains(loc.getDataNodeId()))
+              .map(loc -> dataNodesCurrent.getOrDefault(loc.getDataNodeId(), 
loc))
               .collect(Collectors.toList());
-      commonLocations.retainAll(dataNodeCandidates);
-      if (!commonLocations.isEmpty()) {
-        final List<TDataNodeLocation> validLocations =
-            
commonLocations.stream().map(dataNodes.get()::get).collect(Collectors.toList());
-        final TRegionReplicaSet validCandidate =
-            new TRegionReplicaSet(replicaSet.getRegionId(), validLocations);
-        reachableSetCandidates.add(validCandidate);
+      if (!validLocations.isEmpty()) {
+        reachableSetCandidates.add(new 
TRegionReplicaSet(replicaSet.getRegionId(), validLocations));
+      } else {
+        // No reachable replica — return full set so upper layer can handle or 
report the error
+        reachableSetCandidates.add(replicaSet);
       }
     }
-
     return reachableSetCandidates;
   }
 
+  /** Called by ConfigNode's TopologyService push. Updates this node's 
reachable set. */
   public void updateTopology(
       final Map<Integer, TDataNodeLocation> dataNodes, Map<Integer, 
Set<Integer>> latestTopology) {
-    if (!latestTopology.equals(topologyMap.get())) {
-      LOGGER.info("[Topology] latest view from config-node: {}", 
latestTopology);
-      for (int fromId : dataNodes.keySet()) {
-        for (int toId : dataNodes.keySet()) {
-          boolean originReachable =
-              latestTopology.getOrDefault(fromId, 
Collections.emptySet()).contains(toId);
-          boolean newReachable =
-              latestTopology.getOrDefault(fromId, 
Collections.emptySet()).contains(toId);
-          if (originReachable != newReachable) {
-            LOGGER.info(
-                "[Topology] Topology of DataNode {} is now {} to DataNode {}",
-                fromId,
-                newReachable ? "reachable" : "unreachable",
-                toId);
-          }
+    final Set<Integer> newReachable = latestTopology.getOrDefault(myself, 
Collections.emptySet());
+    final Set<Integer> oldReachable = this.myReachableNodes.get();
+
+    if (!newReachable.equals(oldReachable)) {
+      LOGGER.info(
+          "[Topology] latest view from config-node for myself({}): {}", 
myself, newReachable);
+      for (int toId : dataNodes.keySet()) {
+        boolean wasReachable = oldReachable.contains(toId);
+        boolean nowReachable = newReachable.contains(toId);
+        if (wasReachable != nowReachable) {
+          LOGGER.info(
+              "[Topology] DataNode {} is now {} to myself({})",
+              toId,
+              nowReachable ? "reachable" : "unreachable",
+              myself);
         }
       }
-      this.topologyMap.set(latestTopology);
+      this.myReachableNodes.set(newReachable);
     }
     this.dataNodes.set(dataNodes);
-    if (latestTopology.get(myself) == null || 
latestTopology.get(myself).isEmpty()) {
-      // latest topology doesn't include this node information.
-      // This mostly happens when this node just starts and haven't report 
connection details.
+    if (newReachable.isEmpty()) {
       this.isPartitioned.set(false);
     } else {
-      this.isPartitioned.set(latestTopology.get(myself).size() != 
latestTopology.size());
+      this.isPartitioned.set(newReachable.size() != dataNodes.size());
     }
   }
 
@@ -180,7 +173,7 @@ public class ClusterTopology {
     this.myself =
         
IoTDBDescriptor.getInstance().getConfig().generateLocalDataNodeLocation().getDataNodeId();
     this.isPartitioned.set(false);
-    this.topologyMap = new AtomicReference<>(Collections.emptyMap());
+    this.myReachableNodes = new AtomicReference<>(Collections.emptySet());
     this.dataNodes = new AtomicReference<>(Collections.emptyMap());
   }
 
diff --git 
a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template
 
b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template
index 54d9ccaf5ce..69bcd275250 100644
--- 
a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template
+++ 
b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template
@@ -735,6 +735,21 @@ failure_detector_phi_threshold=30
 # Datatype: long
 failure_detector_phi_acceptable_pause_in_ms=10000
 
+# Whether to enable topology probing between DataNodes
+# effectiveMode: hot_reload
+# Datatype: Boolean
+# enable_topology_probing=false
+
+# Base interval in ms for topology probing between DataNodes
+# effectiveMode: restart
+# Datatype: long
+# topology_probing_base_interval_in_ms=5000
+
+# Ratio of probing timeout to probing interval (must be less than 1.0)
+# effectiveMode: restart
+# Datatype: double
+# topology_probing_timeout_ratio=0.5
+
 # Disk remaining threshold at which DataNode is set to ReadOnly status
 # effectiveMode: restart
 # Datatype: double(percentage)
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
index b08416bd8c6..4bee7bd1b5b 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
@@ -195,6 +195,7 @@ public enum ThreadName {
   INFLUXDB_RPC_PROCESSOR("InfluxdbRPC-Processor"),
   STORAGE_ENGINE_CACHED_POOL("StorageEngine"),
   DATANODE_SHUTDOWN_HOOK("DataNode-Shutdown-Hook"),
+  DATANODE_TOPOLOGY_PROBING("DataNode-Topology-Probing"),
   UPGRADE_TASK("UpgradeThread"),
   REGION_MIGRATE("Region-Migrate-Pool"),
   STORAGE_ENGINE_RECOVER_TRIGGER("StorageEngine-RecoverTrigger"),
diff --git a/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift 
b/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift
index 4323e956a8e..92a7602b34d 100644
--- a/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift
+++ b/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift
@@ -332,6 +332,11 @@ struct TRegionRouteReq {
   2: required map<common.TConsensusGroupId, common.TRegionReplicaSet> 
regionRouteMap
 }
 
+struct TUpdateClusterTopologyReq {
+  1: required map<i32, common.TDataNodeLocation> dataNodes
+  2: required map<i32, set<i32>> topology
+}
+
 struct TUpdateTemplateReq {
   1: required byte type
   2: required binary templateInfo
@@ -1304,6 +1309,12 @@ service IDataNodeRPCService {
   /** Empty rpc, only for connection test */
   common.TSStatus testConnectionEmptyRPC()
 
+  /**
+   * Push cluster topology to this DataNode.
+   * Each DataNode receives only its own reachable set.
+   */
+  common.TSStatus updateClusterTopology(1:TUpdateClusterTopologyReq req)
+
   /** to write audit log or other events as time series **/
   common.TSStatus insertRecord(1:client.TSInsertRecordReq req);
 

Reply via email to