This is an automated email from the ASF dual-hosted git repository.

CRZbulabula pushed a commit to branch upgrade-heartbeat-service
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 2ddad6d48ebe1c580491a8249c2a96102e901052
Author: Yongzao <[email protected]>
AuthorDate: Tue May 5 13:58:28 2026 +0800

    ready 4 review
---
 .../iotdb/confignode/conf/ConfigNodeConfig.java    |  33 +++++
 .../confignode/conf/ConfigNodeDescriptor.java      |  18 +++
 .../confignode/manager/load/cache/LoadCache.java   |   2 +-
 .../manager/load/service/HeartbeatService.java     |   8 +-
 .../manager/load/service/TopologyService.java      | 143 ++++++++++++++++++---
 .../db/pipe/agent/task/PipeDataNodeTaskAgent.java  |   4 +-
 .../impl/DataNodeInternalRPCServiceImpl.java       |  42 ++++--
 .../iotdb/db/queryengine/plan/ClusterTopology.java |   2 +-
 .../iotdb/commons/client/ClientPoolFactory.java    |   6 +-
 .../iotdb/commons/concurrent/ThreadName.java       |   1 +
 .../apache/iotdb/commons/conf/CommonConfig.java    |  10 ++
 .../iotdb/commons/conf/CommonDescriptor.java       |   6 +
 12 files changed, 237 insertions(+), 38 deletions(-)

diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java
index f305b06398d..306e6cd895f 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java
@@ -204,6 +204,15 @@ public class ConfigNodeConfig {
   /** Acceptable pause duration for Phi accrual failure detector */
   private long failureDetectorPhiAcceptablePauseInMs = 10000;
 
+  /** Base interval in ms for topology probing. Actual interval scales with 
DataNode count. */
+  private long topologyProbingBaseIntervalInMs = 5000;
+
+  /** Reference DataNode count for adaptive probing interval scaling. */
+  private int topologyProbingReferenceNodeCount = 10;
+
+  /** Ratio of probing timeout to probing interval (must be less than 1.0). */
+  private double topologyProbingTimeoutRatio = 0.5;
+
   /** The policy of cluster RegionGroups' leader distribution. */
   private String leaderDistributionPolicy = AbstractLeaderBalancer.CFD_POLICY;
 
@@ -1288,4 +1297,28 @@ public class ConfigNodeConfig {
   public void setFailureDetectorPhiAcceptablePauseInMs(long 
failureDetectorPhiAcceptablePauseInMs) {
     this.failureDetectorPhiAcceptablePauseInMs = 
failureDetectorPhiAcceptablePauseInMs;
   }
+
+  public long getTopologyProbingBaseIntervalInMs() {
+    return topologyProbingBaseIntervalInMs;
+  }
+
+  public void setTopologyProbingBaseIntervalInMs(long 
topologyProbingBaseIntervalInMs) {
+    this.topologyProbingBaseIntervalInMs = topologyProbingBaseIntervalInMs;
+  }
+
+  public int getTopologyProbingReferenceNodeCount() {
+    return topologyProbingReferenceNodeCount;
+  }
+
+  public void setTopologyProbingReferenceNodeCount(int 
topologyProbingReferenceNodeCount) {
+    this.topologyProbingReferenceNodeCount = topologyProbingReferenceNodeCount;
+  }
+
+  public double getTopologyProbingTimeoutRatio() {
+    return topologyProbingTimeoutRatio;
+  }
+
+  public void setTopologyProbingTimeoutRatio(double 
topologyProbingTimeoutRatio) {
+    this.topologyProbingTimeoutRatio = topologyProbingTimeoutRatio;
+  }
 }
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
index 77790dae1a9..506131a583a 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
@@ -322,6 +322,24 @@ public class ConfigNodeDescriptor {
                 "failure_detector_phi_acceptable_pause_in_ms",
                 
String.valueOf(conf.getFailureDetectorPhiAcceptablePauseInMs()))));
 
+    conf.setTopologyProbingBaseIntervalInMs(
+        Long.parseLong(
+            properties.getProperty(
+                "topology_probing_base_interval_in_ms",
+                String.valueOf(conf.getTopologyProbingBaseIntervalInMs()))));
+
+    conf.setTopologyProbingReferenceNodeCount(
+        Integer.parseInt(
+            properties.getProperty(
+                "topology_probing_reference_node_count",
+                String.valueOf(conf.getTopologyProbingReferenceNodeCount()))));
+
+    conf.setTopologyProbingTimeoutRatio(
+        Double.parseDouble(
+            properties.getProperty(
+                "topology_probing_timeout_ratio",
+                String.valueOf(conf.getTopologyProbingTimeoutRatio()))));
+
     String leaderDistributionPolicy =
         properties.getProperty("leader_distribution_policy", 
conf.getLeaderDistributionPolicy());
     if (AbstractLeaderBalancer.GREEDY_POLICY.equals(leaderDistributionPolicy)
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/LoadCache.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/LoadCache.java
index 07105d96bbd..41a0dbc5c46 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/LoadCache.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/LoadCache.java
@@ -787,7 +787,7 @@ public class LoadCache {
       for (int fromId : latestTopology.keySet()) {
         for (int toId : latestTopology.keySet()) {
           boolean originReachable =
-              latestTopology.getOrDefault(fromId, 
Collections.emptySet()).contains(toId);
+              topologyGraph.getOrDefault(fromId, 
Collections.emptySet()).contains(toId);
           boolean newReachable =
               latestTopology.getOrDefault(fromId, 
Collections.emptySet()).contains(toId);
           if (originReachable != newReachable) {
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/HeartbeatService.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/HeartbeatService.java
index 64322da5bbb..e5f65152c87 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/HeartbeatService.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/HeartbeatService.java
@@ -49,7 +49,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.List;
-import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.Future;
@@ -168,12 +167,7 @@ public class HeartbeatService {
       
heartbeatReq.setSpaceQuotaUsage(configManager.getClusterQuotaManager().getSpaceQuotaUsage());
     }
 
-    final Map<Integer, Set<Integer>> topologyMap =
-        configManager.getLoadManager().getLoadCache().getTopology();
-    if (topologyMap != null) {
-      heartbeatReq.setTopology(topologyMap);
-      
heartbeatReq.setDataNodes(configManager.getNodeManager().getRegisteredDataNodeLocations());
-    }
+    // Topology is now pushed independently by TopologyService, no longer 
piggybacked on heartbeat
 
     // We broadcast region operations list every 100 heartbeat loops
     if (heartbeatCounter.get() % 100 == 0) {
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/TopologyService.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/TopologyService.java
index 9e4f6bd6121..1fd716fc7fc 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/TopologyService.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/TopologyService.java
@@ -21,9 +21,13 @@ package org.apache.iotdb.confignode.manager.load.service;
 
 import org.apache.iotdb.common.rpc.thrift.TDataNodeConfiguration;
 import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.common.rpc.thrift.TNodeLocations;
 import org.apache.iotdb.common.rpc.thrift.TTestConnectionResp;
 import org.apache.iotdb.common.rpc.thrift.TTestConnectionResult;
+import org.apache.iotdb.commons.client.ClientPoolFactory;
+import org.apache.iotdb.commons.client.IClientManager;
+import 
org.apache.iotdb.commons.client.async.AsyncDataNodeInternalServiceClient;
 import org.apache.iotdb.commons.cluster.NodeStatus;
 import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
 import org.apache.iotdb.commons.concurrent.ThreadName;
@@ -41,14 +45,18 @@ import 
org.apache.iotdb.confignode.manager.load.cache.node.NodeHeartbeatSample;
 import org.apache.iotdb.confignode.manager.load.cache.node.NodeStatistics;
 import 
org.apache.iotdb.confignode.manager.load.subscriber.IClusterStatusSubscriber;
 import 
org.apache.iotdb.confignode.manager.load.subscriber.NodeStatisticsChangeEvent;
+import org.apache.iotdb.mpp.rpc.thrift.TDataNodeHeartbeatReq;
+import org.apache.iotdb.mpp.rpc.thrift.TDataNodeHeartbeatResp;
 
 import org.apache.ratis.util.AwaitForSignal;
+import org.apache.thrift.async.AsyncMethodCallback;
 import org.apache.tsfile.utils.Pair;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.Comparator;
 import java.util.HashSet;
 import java.util.LinkedList;
 import java.util.List;
@@ -67,8 +75,6 @@ import java.util.stream.Collectors;
 
 public class TopologyService implements Runnable, IClusterStatusSubscriber {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(TopologyService.class);
-  private static final long PROBING_INTERVAL_MS = 5_000L;
-  private static final long PROBING_TIMEOUT_MS = PROBING_INTERVAL_MS;
   private static final int SAMPLING_WINDOW_SIZE = 100;
 
   private final ExecutorService topologyThread =
@@ -90,6 +96,16 @@ public class TopologyService implements Runnable, 
IClusterStatusSubscriber {
   private final IFailureDetector failureDetector;
   private static final ConfigNodeConfig CONF = 
ConfigNodeDescriptor.getInstance().getConf();
 
+  /** Rotation index for sqrt(N) prober selection. */
+  private int proberRotationIndex = 0;
+
+  /** Last topology pushed to each DataNode, used to detect changes. */
+  private final Map<Integer, Set<Integer>> lastPushedTopology = new 
ConcurrentHashMap<>();
+
+  /** Client manager for pushing topology updates to DataNodes via heartbeat 
RPC. */
+  private final IClientManager<TEndPoint, AsyncDataNodeInternalServiceClient>
+      topologyPushClientManager;
+
   public TopologyService(
       IManager configManager, Consumer<Map<Integer, Set<Integer>>> 
topologyChangeListener) {
     this.configManager = configManager;
@@ -97,6 +113,10 @@ public class TopologyService implements Runnable, 
IClusterStatusSubscriber {
     this.heartbeats = new ConcurrentHashMap<>();
     this.shouldRun = new AtomicBoolean(false);
     this.awaitForSignal = new AwaitForSignal(this.getClass().getSimpleName());
+    this.topologyPushClientManager =
+        new IClientManager.Factory<TEndPoint, 
AsyncDataNodeInternalServiceClient>()
+            .createClientManager(
+                new 
ClientPoolFactory.AsyncDataNodeHeartbeatServiceClientPoolFactory());
 
     // here we use the same failure
     switch (CONF.getFailureDetector()) {
@@ -133,12 +153,17 @@ public class TopologyService implements Runnable, 
IClusterStatusSubscriber {
   }
 
   /**
-   * Schedule the {@link #topologyProbing} task either: 1. every 
PROBING_INTERVAL_MS interval. 2.
+   * Schedule the {@link #topologyProbing} task either: 1. every adaptive 
probing interval. 2.
    * Manually triggered by outside events (node restart / register, etc.).
    */
   private boolean mayWait() {
     try {
-      this.awaitForSignal.await(PROBING_INTERVAL_MS, TimeUnit.MILLISECONDS);
+      long baseInterval = CONF.getTopologyProbingBaseIntervalInMs();
+      int dataNodeCount =
+          configManager.getNodeManager().getRegisteredDataNodes().size() - 
startingDataNodes.size();
+      int referenceNodeCount = CONF.getTopologyProbingReferenceNodeCount();
+      long interval = Math.max(baseInterval, baseInterval * dataNodeCount / 
referenceNodeCount);
+      this.awaitForSignal.await(interval, TimeUnit.MILLISECONDS);
       return true;
     } catch (InterruptedException e) {
       // we don't reset the interrupt flag here since we may reuse this thread 
again.
@@ -153,8 +178,29 @@ public class TopologyService implements Runnable, 
IClusterStatusSubscriber {
     }
   }
 
+  /**
+   * Select sqrt(N) DataNodes as probers, rotating through all DataNodes 
across cycles so that every
+   * DataNode gets to be a prober over sqrt(N) cycles.
+   */
+  private List<TDataNodeLocation> selectProbers(List<TDataNodeLocation> 
allDataNodes) {
+    int n = allDataNodes.size();
+    if (n <= 1) {
+      return allDataNodes;
+    }
+    int sqrtN = (int) Math.ceil(Math.sqrt(n));
+    List<TDataNodeLocation> sorted = new ArrayList<>(allDataNodes);
+    sorted.sort(Comparator.comparingInt(TDataNodeLocation::getDataNodeId));
+    int startIndex = (proberRotationIndex * sqrtN) % n;
+    proberRotationIndex++;
+    List<TDataNodeLocation> probers = new ArrayList<>(sqrtN);
+    for (int i = 0; i < sqrtN && i < n; i++) {
+      probers.add(sorted.get((startIndex + i) % n));
+    }
+    return probers;
+  }
+
   private synchronized void topologyProbing() {
-    // 1. get the latest datanode list
+    // 1. get the latest datanode list, filter out starting ones
     final List<TDataNodeLocation> dataNodeLocations = new ArrayList<>();
     final Set<Integer> dataNodeIds = new HashSet<>();
     for (final TDataNodeConfiguration dataNodeConf :
@@ -167,22 +213,36 @@ public class TopologyService implements Runnable, 
IClusterStatusSubscriber {
       dataNodeIds.add(location.getDataNodeId());
     }
 
-    // 2. send the verify connection RPC to all datanodes
+    // 2. compute adaptive interval and timeout from N = 
dataNodeLocations.size()
+    final long baseInterval = CONF.getTopologyProbingBaseIntervalInMs();
+    final int referenceNodeCount = CONF.getTopologyProbingReferenceNodeCount();
+    final double timeoutRatio = CONF.getTopologyProbingTimeoutRatio();
+    final int dataNodeCount = dataNodeLocations.size();
+    final long interval = Math.max(baseInterval, baseInterval * dataNodeCount 
/ referenceNodeCount);
+    final long timeout = (long) (interval * timeoutRatio);
+
+    // 3. select sqrt(N) probers via rotating selection
+    final List<TDataNodeLocation> probers = selectProbers(dataNodeLocations);
+
+    // 4. build TNodeLocations with ALL DataNode locations (so probers test 
all targets)
     final TNodeLocations nodeLocations = new TNodeLocations();
     nodeLocations.setDataNodeLocations(dataNodeLocations);
     nodeLocations.setConfigNodeLocations(Collections.emptyList());
-    final Map<Integer, TDataNodeLocation> dataNodeLocationMap =
-        configManager.getNodeManager().getRegisteredDataNodes().stream()
-            .map(TDataNodeConfiguration::getLocation)
+
+    // 5. build proberLocationMap containing only the selected probers
+    final Map<Integer, TDataNodeLocation> proberLocationMap =
+        probers.stream()
             .collect(Collectors.toMap(TDataNodeLocation::getDataNodeId, 
location -> location));
+
+    // 6. send async requests ONLY to probers (not all DataNodes) with 
computed timeout
     final DataNodeAsyncRequestContext<TNodeLocations, TTestConnectionResp>
         dataNodeAsyncRequestContext =
             new DataNodeAsyncRequestContext<>(
                 CnToDnAsyncRequestType.SUBMIT_TEST_DN_INTERNAL_CONNECTION_TASK,
                 nodeLocations,
-                dataNodeLocationMap);
+                proberLocationMap);
     CnToDnInternalServiceAsyncRequestManager.getInstance()
-        .sendAsyncRequestWithTimeoutInMs(dataNodeAsyncRequestContext, 
PROBING_TIMEOUT_MS);
+        .sendAsyncRequestWithTimeoutInMs(dataNodeAsyncRequestContext, timeout);
     final List<TTestConnectionResult> results = new ArrayList<>();
     dataNodeAsyncRequestContext
         .getResponseMap()
@@ -193,7 +253,7 @@ public class TopologyService implements Runnable, 
IClusterStatusSubscriber {
               }
             });
 
-    // 3. collect results and update the heartbeat timestamps
+    // 7. collect results and update the heartbeat timestamps
     for (final TTestConnectionResult result : results) {
       final int fromDataNodeId =
           Optional.ofNullable(result.getSender().getDataNodeLocation())
@@ -203,8 +263,6 @@ public class TopologyService implements Runnable, 
IClusterStatusSubscriber {
       if (result.isSuccess()
           && dataNodeIds.contains(fromDataNodeId)
           && dataNodeIds.contains(toDataNodeId)) {
-        // testAllDataNodeConnectionWithTimeout ensures the heartbeats are 
Dn-Dn internally. Here we
-        // just double-check.
         final List<AbstractHeartbeatSample> heartbeatHistory =
             heartbeats.computeIfAbsent(
                 new Pair<>(fromDataNodeId, toDataNodeId), p -> new 
LinkedList<>());
@@ -215,7 +273,7 @@ public class TopologyService implements Runnable, 
IClusterStatusSubscriber {
       }
     }
 
-    // 4. use failure detector to identify potential network partitions
+    // 8. use failure detector to identify potential network partitions (on 
ALL heartbeat pairs)
     final Map<Integer, Set<Integer>> latestTopology =
         dataNodeLocations.stream()
             .collect(Collectors.toMap(TDataNodeLocation::getDataNodeId, k -> 
new HashSet<>()));
@@ -234,10 +292,63 @@ public class TopologyService implements Runnable, 
IClusterStatusSubscriber {
 
     logAsymmetricPartition(latestTopology);
 
-    // 5. notify the listeners on topology change
+    // 9. notify the listeners on topology change
     if (shouldRun.get()) {
       topologyChangeListener.accept(latestTopology);
     }
+
+    // 10. push topology changes to DataNodes
+    pushTopologyToDataNodes(latestTopology, dataNodeLocations);
+  }
+
+  /**
+   * Push topology changes to DataNodes via heartbeat RPC. Each DataNode only 
receives a push when
+   * its own reachable set has changed since the last push.
+   */
+  private void pushTopologyToDataNodes(
+      Map<Integer, Set<Integer>> latestTopology, List<TDataNodeLocation> 
dataNodeLocations) {
+    // Build dataNodes map once for all pushes
+    final Map<Integer, TDataNodeLocation> dataNodesMap =
+        dataNodeLocations.stream()
+            .collect(Collectors.toMap(TDataNodeLocation::getDataNodeId, loc -> 
loc));
+
+    for (final TDataNodeLocation location : dataNodeLocations) {
+      final int nodeId = location.getDataNodeId();
+      final Set<Integer> reachableSet = latestTopology.getOrDefault(nodeId, 
Collections.emptySet());
+      final Set<Integer> lastPushed = lastPushedTopology.get(nodeId);
+
+      if (lastPushed != null && lastPushed.equals(reachableSet)) {
+        continue;
+      }
+
+      final TDataNodeHeartbeatReq req =
+          new TDataNodeHeartbeatReq(System.nanoTime(), false, false, 0L, 0L);
+      req.setTopology(latestTopology);
+      req.setDataNodes(dataNodesMap);
+
+      final TEndPoint endPoint = location.getInternalEndPoint();
+      try {
+        topologyPushClientManager
+            .borrowClient(endPoint)
+            .getDataNodeHeartBeat(
+                req,
+                new AsyncMethodCallback<TDataNodeHeartbeatResp>() {
+                  @Override
+                  public void onComplete(TDataNodeHeartbeatResp response) {
+                    // No-op: topology push is fire-and-forget
+                  }
+
+                  @Override
+                  public void onError(Exception exception) {
+                    // No-op: topology push failures are silently ignored
+                  }
+                });
+        lastPushedTopology.put(nodeId, new HashSet<>(reachableSet));
+      } catch (Exception e) {
+        LOGGER.debug(
+            "Failed to push topology to DataNode {} at {}: {}", nodeId, 
endPoint, e.getMessage());
+      }
+    }
   }
 
   /**
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
index 031cfd3a62e..bc0a619afd8 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
@@ -24,7 +24,6 @@ import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.concurrent.IoTThreadFactory;
 import org.apache.iotdb.commons.concurrent.ThreadName;
 import 
org.apache.iotdb.commons.concurrent.threadpool.WrappedThreadPoolExecutor;
-import org.apache.iotdb.commons.conf.CommonDescriptor;
 import org.apache.iotdb.commons.consensus.DataRegionId;
 import org.apache.iotdb.commons.consensus.SchemaRegionId;
 import org.apache.iotdb.commons.consensus.index.ProgressIndex;
@@ -377,8 +376,7 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
   ///////////////////////// Heartbeat /////////////////////////
 
   public void collectPipeMetaList(final TDataNodeHeartbeatResp resp) throws 
TException {
-    if (!tryReadLockWithTimeOut(
-        
CommonDescriptor.getInstance().getConfig().getDnConnectionTimeoutInMS() * 2L / 
3)) {
+    if (!tryReadLockWithTimeOut(2)) {
       return;
     }
     try {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
index 84479c9dcd2..e889d936ace 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
@@ -52,6 +52,7 @@ import 
org.apache.iotdb.commons.client.request.AsyncRequestContext;
 import org.apache.iotdb.commons.cluster.NodeStatus;
 import org.apache.iotdb.commons.concurrent.Await;
 import org.apache.iotdb.commons.concurrent.AwaitTimeoutException;
+import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
 import org.apache.iotdb.commons.concurrent.IoTThreadFactory;
 import org.apache.iotdb.commons.concurrent.ThreadName;
 import 
org.apache.iotdb.commons.concurrent.threadpool.WrappedThreadPoolExecutor;
@@ -417,6 +418,12 @@ public class DataNodeInternalRPCServiceImpl implements 
IDataNodeRPCService.Iface
 
   private final ClusterTopology clusterTopology = 
ClusterTopology.getInstance();
 
+  private static final long TEST_CONNECTION_TIMEOUT_MS =
+      CommonDescriptor.getInstance().getConfig().getDnConnectionTimeoutInMS();
+
+  private static final ExecutorService TOPOLOGY_PROBING_EXECUTOR =
+      IoTDBThreadPoolFactory.newFixedThreadPool(2, 
ThreadName.DATANODE_TOPOLOGY_PROBING.getName());
+
   private final CommonConfig commonConfig = 
CommonDescriptor.getInstance().getConfig();
 
   private final DataNodeContext dataNodeContext;
@@ -2076,9 +2083,23 @@ public class DataNodeInternalRPCServiceImpl implements 
IDataNodeRPCService.Iface
   @Override
   public TTestConnectionResp submitInternalTestConnectionTask(TNodeLocations 
nodeLocations)
       throws TException {
-    return new TTestConnectionResp(
-        new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()),
-        
testAllDataNodeConnectionInHeartbeatChannel(nodeLocations.getDataNodeLocations()));
+    try {
+      Future<TTestConnectionResp> future =
+          TOPOLOGY_PROBING_EXECUTOR.submit(
+              () ->
+                  new TTestConnectionResp(
+                      new 
TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()),
+                      testAllDataNodeConnectionInHeartbeatChannel(
+                          nodeLocations.getDataNodeLocations())));
+      return future.get(TEST_CONNECTION_TIMEOUT_MS, TimeUnit.MILLISECONDS);
+    } catch (TimeoutException e) {
+      return new TTestConnectionResp(
+          new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode())
+              .setMessage("Topology probing timed out after " + 
TEST_CONNECTION_TIMEOUT_MS + "ms"),
+          Collections.emptyList());
+    } catch (Exception e) {
+      throw new TException(e);
+    }
   }
 
   private static <Location, RequestType> List<TTestConnectionResult> 
testConnections(
@@ -2105,7 +2126,8 @@ public class DataNodeInternalRPCServiceImpl implements 
IDataNodeRPCService.Iface
         TServiceType.ConfigNodeInternalService,
         DnToCnRequestType.TEST_CONNECTION,
         (AsyncRequestContext<Object, TSStatus, DnToCnRequestType, 
TConfigNodeLocation> handler) ->
-            
DnToCnInternalServiceAsyncRequestManager.getInstance().sendAsyncRequest(handler));
+            DnToCnInternalServiceAsyncRequestManager.getInstance()
+                .sendAsyncRequestWithTimeoutInMs(handler, 
TEST_CONNECTION_TIMEOUT_MS));
   }
 
   private List<TTestConnectionResult> 
testAllDataNodeConnectionInHeartbeatChannel(
@@ -2117,7 +2139,8 @@ public class DataNodeInternalRPCServiceImpl implements 
IDataNodeRPCService.Iface
         TServiceType.DataNodeInternalService,
         DnToDnRequestType.TEST_CONNECTION,
         (AsyncRequestContext<Object, TSStatus, DnToDnRequestType, 
TDataNodeLocation> handler) ->
-            
DataNodeIntraHeartbeatManager.getInstance().sendAsyncRequest(handler, 1, null, 
true));
+            DataNodeIntraHeartbeatManager.getInstance()
+                .sendAsyncRequest(handler, 1, TEST_CONNECTION_TIMEOUT_MS, 
true));
   }
 
   private List<TTestConnectionResult> testAllDataNodeInternalServiceConnection(
@@ -2129,7 +2152,8 @@ public class DataNodeInternalRPCServiceImpl implements 
IDataNodeRPCService.Iface
         TServiceType.DataNodeInternalService,
         DnToDnRequestType.TEST_CONNECTION,
         (AsyncRequestContext<Object, TSStatus, DnToDnRequestType, 
TDataNodeLocation> handler) ->
-            
DnToDnInternalServiceAsyncRequestManager.getInstance().sendAsyncRequest(handler));
+            DnToDnInternalServiceAsyncRequestManager.getInstance()
+                .sendAsyncRequestWithTimeoutInMs(handler, 
TEST_CONNECTION_TIMEOUT_MS));
   }
 
   private List<TTestConnectionResult> testAllDataNodeMPPServiceConnection(
@@ -2141,7 +2165,8 @@ public class DataNodeInternalRPCServiceImpl implements 
IDataNodeRPCService.Iface
         TServiceType.DataNodeMPPService,
         DnToDnRequestType.TEST_CONNECTION,
         (AsyncRequestContext<Object, TSStatus, DnToDnRequestType, 
TDataNodeLocation> handler) ->
-            
DataNodeMPPServiceAsyncRequestManager.getInstance().sendAsyncRequest(handler));
+            DataNodeMPPServiceAsyncRequestManager.getInstance()
+                .sendAsyncRequestWithTimeoutInMs(handler, 
TEST_CONNECTION_TIMEOUT_MS));
   }
 
   private List<TTestConnectionResult> testAllDataNodeExternalServiceConnection(
@@ -2153,7 +2178,8 @@ public class DataNodeInternalRPCServiceImpl implements 
IDataNodeRPCService.Iface
         TServiceType.DataNodeExternalService,
         DnToDnRequestType.TEST_CONNECTION,
         (AsyncRequestContext<Object, TSStatus, DnToDnRequestType, 
TDataNodeLocation> handler) ->
-            
DataNodeExternalServiceAsyncRequestManager.getInstance().sendAsyncRequest(handler));
+            DataNodeExternalServiceAsyncRequestManager.getInstance()
+                .sendAsyncRequestWithTimeoutInMs(handler, 
TEST_CONNECTION_TIMEOUT_MS));
   }
 
   @Override
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/ClusterTopology.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/ClusterTopology.java
index 64ea8b8a2ea..194d9f8c6cd 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/ClusterTopology.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/ClusterTopology.java
@@ -152,7 +152,7 @@ public class ClusterTopology {
       for (int fromId : dataNodes.keySet()) {
         for (int toId : dataNodes.keySet()) {
           boolean originReachable =
-              latestTopology.getOrDefault(fromId, 
Collections.emptySet()).contains(toId);
+              this.topologyMap.get().getOrDefault(fromId, 
Collections.emptySet()).contains(toId);
           boolean newReachable =
               latestTopology.getOrDefault(fromId, 
Collections.emptySet()).contains(toId);
           if (originReachable != newReachable) {
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientPoolFactory.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientPoolFactory.java
index ea20f9c76cc..f9ffca123b5 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientPoolFactory.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientPoolFactory.java
@@ -182,7 +182,8 @@ public class ClientPoolFactory {
                   new ThriftClientProperty.Builder()
                       
.setConnectionTimeoutMs(conf.getCnConnectionTimeoutInMS())
                       
.setRpcThriftCompressionEnabled(conf.isRpcThriftCompressionEnabled())
-                      
.setSelectorNumOfAsyncClientManager(conf.getSelectorNumOfClientManager())
+                      .setSelectorNumOfAsyncClientManager(
+                          conf.getHeartbeatSelectorNumOfClientManager())
                       .setPrintLogWhenEncounterException(false)
                       .build(),
                   ThreadName.ASYNC_CONFIGNODE_HEARTBEAT_CLIENT_POOL.getName()),
@@ -207,7 +208,8 @@ public class ClientPoolFactory {
                   new ThriftClientProperty.Builder()
                       
.setConnectionTimeoutMs(conf.getCnConnectionTimeoutInMS())
                       
.setRpcThriftCompressionEnabled(conf.isRpcThriftCompressionEnabled())
-                      
.setSelectorNumOfAsyncClientManager(conf.getSelectorNumOfClientManager())
+                      .setSelectorNumOfAsyncClientManager(
+                          conf.getHeartbeatSelectorNumOfClientManager())
                       .setPrintLogWhenEncounterException(false)
                       .build(),
                   ThreadName.ASYNC_DATANODE_HEARTBEAT_CLIENT_POOL.getName()),
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
index 20fa9d78d59..ad22c8b20ae 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
@@ -196,6 +196,7 @@ public enum ThreadName {
   INFLUXDB_RPC_PROCESSOR("InfluxdbRPC-Processor"),
   STORAGE_ENGINE_CACHED_POOL("StorageEngine"),
   DATANODE_SHUTDOWN_HOOK("DataNode-Shutdown-Hook"),
+  DATANODE_TOPOLOGY_PROBING("DataNode-Topology-Probing"),
   UPGRADE_TASK("UpgradeThread"),
   REGION_MIGRATE("Region-Migrate-Pool"),
   STORAGE_ENGINE_RECOVER_TRIGGER("StorageEngine-RecoverTrigger"),
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
index 19313723e55..598c9d5f021 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
@@ -155,6 +155,8 @@ public class CommonConfig {
    */
   private int selectorNumOfClientManager = 1;
 
+  private int heartbeatSelectorNumOfClientManager = 4;
+
   /** Whether to use thrift compression. */
   private boolean isRpcThriftCompressionEnabled = false;
 
@@ -697,6 +699,14 @@ public class CommonConfig {
     this.selectorNumOfClientManager = selectorNumOfClientManager;
   }
 
+  public int getHeartbeatSelectorNumOfClientManager() {
+    return heartbeatSelectorNumOfClientManager;
+  }
+
+  public void setHeartbeatSelectorNumOfClientManager(int 
heartbeatSelectorNumOfClientManager) {
+    this.heartbeatSelectorNumOfClientManager = 
heartbeatSelectorNumOfClientManager;
+  }
+
   public boolean isRpcThriftCompressionEnabled() {
     return isRpcThriftCompressionEnabled;
   }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
index d392a60bbbd..11db54be34f 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
@@ -178,6 +178,12 @@ public class CommonDescriptor {
                     String.valueOf(config.getSelectorNumOfClientManager()))
                 .trim()));
 
+    config.setHeartbeatSelectorNumOfClientManager(
+        Integer.parseInt(
+            properties.getProperty(
+                "heartbeat_selector_num_of_client_manager",
+                
String.valueOf(config.getHeartbeatSelectorNumOfClientManager()))));
+
     config.setMaxClientNumForEachNode(
         Integer.parseInt(
             properties

Reply via email to