This is an automated email from the ASF dual-hosted git repository.
CRZbulabula pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new d4be5c82e0f Improve TopologyService and HeartbeatService scalability
for large clusters (#17595)
d4be5c82e0f is described below
commit d4be5c82e0f8ba08b65484d529998a52591a7a5b
Author: Yongzao <[email protected]>
AuthorDate: Fri May 8 15:56:11 2026 +0800
Improve TopologyService and HeartbeatService scalability for large clusters
(#17595)
---
.../it/env/cluster/config/MppCommonConfig.java | 6 +
.../env/cluster/config/MppSharedCommonConfig.java | 7 +
.../it/env/remote/config/RemoteCommonConfig.java | 5 +
.../org/apache/iotdb/itbase/env/CommonConfig.java | 2 +
...ableAggregationQueryWithNetworkPartitionIT.java | 3 +-
.../client/async/CnToDnAsyncRequestType.java | 1 +
.../CnToDnInternalServiceAsyncRequestManager.java | 6 +
.../rpc/DataNodeAsyncRequestRPCHandler.java | 1 +
.../iotdb/confignode/conf/ConfigNodeConfig.java | 33 +++
.../confignode/conf/ConfigNodeDescriptor.java | 31 +++
.../statemachine/ConfigRegionStateMachine.java | 5 +
.../iotdb/confignode/manager/ConfigManager.java | 14 ++
.../iotdb/confignode/manager/load/LoadManager.java | 8 +-
.../confignode/manager/load/cache/LoadCache.java | 2 +-
.../manager/load/service/HeartbeatService.java | 8 -
.../manager/load/service/TopologyService.java | 250 ++++++++++++++-------
.../impl/DataNodeInternalRPCServiceImpl.java | 61 ++++-
.../iotdb/db/queryengine/plan/ClusterTopology.java | 109 +++++----
.../conf/iotdb-system.properties.template | 15 ++
.../iotdb/commons/concurrent/ThreadName.java | 1 +
.../src/main/thrift/datanode.thrift | 11 +
21 files changed, 422 insertions(+), 157 deletions(-)
diff --git
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppCommonConfig.java
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppCommonConfig.java
index 5d1ba8fc805..1578e8a3fbc 100644
---
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppCommonConfig.java
+++
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppCommonConfig.java
@@ -664,6 +664,12 @@ public class MppCommonConfig extends MppBaseConfig
implements CommonConfig {
return this;
}
+ @Override
+ public CommonConfig setEnableTopologyProbing(boolean enableTopologyProbing) {
+ setProperty("enable_topology_probing",
String.valueOf(enableTopologyProbing));
+ return this;
+ }
+
// For part of the log directory
public String getClusterConfigStr() {
return
fromConsensusFullNameToAbbr(properties.getProperty(CONFIG_NODE_CONSENSUS_PROTOCOL_CLASS))
diff --git
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppSharedCommonConfig.java
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppSharedCommonConfig.java
index 29262c3e06b..c7d8becb373 100644
---
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppSharedCommonConfig.java
+++
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppSharedCommonConfig.java
@@ -699,4 +699,11 @@ public class MppSharedCommonConfig implements CommonConfig
{
cnConfig.setMaxRowsInCteBuffer(maxRows);
return this;
}
+
+ @Override
+ public CommonConfig setEnableTopologyProbing(boolean enableTopologyProbing) {
+ dnConfig.setEnableTopologyProbing(enableTopologyProbing);
+ cnConfig.setEnableTopologyProbing(enableTopologyProbing);
+ return this;
+ }
}
diff --git
a/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteCommonConfig.java
b/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteCommonConfig.java
index af48002c3a1..4e0b0d2d727 100644
---
a/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteCommonConfig.java
+++
b/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteCommonConfig.java
@@ -487,4 +487,9 @@ public class RemoteCommonConfig implements CommonConfig {
public CommonConfig setMaxRowsInCteBuffer(int maxRows) {
return this;
}
+
+ @Override
+ public CommonConfig setEnableTopologyProbing(boolean enableTopologyProbing) {
+ return this;
+ }
}
diff --git
a/integration-test/src/main/java/org/apache/iotdb/itbase/env/CommonConfig.java
b/integration-test/src/main/java/org/apache/iotdb/itbase/env/CommonConfig.java
index 09fb5545ebd..fb4c4f56ba5 100644
---
a/integration-test/src/main/java/org/apache/iotdb/itbase/env/CommonConfig.java
+++
b/integration-test/src/main/java/org/apache/iotdb/itbase/env/CommonConfig.java
@@ -215,4 +215,6 @@ public interface CommonConfig {
CommonConfig setCteBufferSize(long cteBufferSize);
CommonConfig setMaxRowsInCteBuffer(int maxRows);
+
+ CommonConfig setEnableTopologyProbing(boolean enableTopologyProbing);
}
diff --git
a/integration-test/src/test/java/org/apache/iotdb/relational/it/query/recent/IoTDBTableAggregationQueryWithNetworkPartitionIT.java
b/integration-test/src/test/java/org/apache/iotdb/relational/it/query/recent/IoTDBTableAggregationQueryWithNetworkPartitionIT.java
index cba551ee74a..bb00451ea8f 100644
---
a/integration-test/src/test/java/org/apache/iotdb/relational/it/query/recent/IoTDBTableAggregationQueryWithNetworkPartitionIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/relational/it/query/recent/IoTDBTableAggregationQueryWithNetworkPartitionIT.java
@@ -76,7 +76,8 @@ public class IoTDBTableAggregationQueryWithNetworkPartitionIT
{
.setSchemaReplicationFactor(testReplicationFactor)
.setDataReplicationFactor(testReplicationFactor)
.setTimePartitionInterval(testTimePartitionInterval)
-
.setDefaultDataRegionGroupNumPerDatabase(testDataRegionGroupPerDatabase);
+
.setDefaultDataRegionGroupNumPerDatabase(testDataRegionGroupPerDatabase)
+ .setEnableTopologyProbing(true);
EnvFactory.getEnv().initClusterEnvironment(1, 3);
prepareTableData(createSqls);
}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/CnToDnAsyncRequestType.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/CnToDnAsyncRequestType.java
index e5753bf1bd1..e592ef8e5cb 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/CnToDnAsyncRequestType.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/CnToDnAsyncRequestType.java
@@ -34,6 +34,7 @@ public enum CnToDnAsyncRequestType {
SUBMIT_TEST_CONNECTION_TASK,
SUBMIT_TEST_DN_INTERNAL_CONNECTION_TASK,
TEST_CONNECTION,
+ PUSH_TOPOLOGY,
// Region Maintenance
CREATE_DATA_REGION,
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/CnToDnInternalServiceAsyncRequestManager.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/CnToDnInternalServiceAsyncRequestManager.java
index 8c7b389bd3d..cc4a6134eb1 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/CnToDnInternalServiceAsyncRequestManager.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/CnToDnInternalServiceAsyncRequestManager.java
@@ -101,6 +101,7 @@ import
org.apache.iotdb.mpp.rpc.thrift.TRollbackViewSchemaBlackListReq;
import
org.apache.iotdb.mpp.rpc.thrift.TTableDeviceDeletionWithPatternAndFilterReq;
import org.apache.iotdb.mpp.rpc.thrift.TTableDeviceDeletionWithPatternOrModReq;
import org.apache.iotdb.mpp.rpc.thrift.TTableDeviceInvalidateCacheReq;
+import org.apache.iotdb.mpp.rpc.thrift.TUpdateClusterTopologyReq;
import org.apache.iotdb.mpp.rpc.thrift.TUpdateTableReq;
import org.apache.iotdb.mpp.rpc.thrift.TUpdateTemplateReq;
import org.apache.iotdb.mpp.rpc.thrift.TUpdateTriggerLocationReq;
@@ -412,6 +413,11 @@ public class CnToDnInternalServiceAsyncRequestManager
CnToDnAsyncRequestType.TEST_CONNECTION,
(req, client, handler) ->
client.testConnectionEmptyRPC((DataNodeTSStatusRPCHandler)
handler));
+ actionMapBuilder.put(
+ CnToDnAsyncRequestType.PUSH_TOPOLOGY,
+ (req, client, handler) ->
+ client.updateClusterTopology(
+ (TUpdateClusterTopologyReq) req, (DataNodeTSStatusRPCHandler)
handler));
actionMapBuilder.put(
CnToDnAsyncRequestType.INSERT_RECORD,
(req, client, handler) ->
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/DataNodeAsyncRequestRPCHandler.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/DataNodeAsyncRequestRPCHandler.java
index b2e2ec32327..ae651d1d9fc 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/DataNodeAsyncRequestRPCHandler.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/DataNodeAsyncRequestRPCHandler.java
@@ -202,6 +202,7 @@ public abstract class
DataNodeAsyncRequestRPCHandler<Response>
dataNodeLocationMap,
(Map<Integer, TExternalServiceListResp>) responseMap,
countDownLatch);
+ case PUSH_TOPOLOGY:
case SET_TTL:
case CREATE_DATA_REGION:
case CREATE_SCHEMA_REGION:
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java
index 64167ba1d2e..22a337dde0f 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java
@@ -215,6 +215,15 @@ public class ConfigNodeConfig {
/** Acceptable pause duration for Phi accrual failure detector */
private long failureDetectorPhiAcceptablePauseInMs = 10000;
+ /** Whether to enable topology probing between DataNodes. Supports
hot-reload. */
+ private volatile boolean enableTopologyProbing = false;
+
+ /** Base interval in ms for topology probing. */
+ private long topologyProbingBaseIntervalInMs = 5000;
+
+ /** Ratio of probing timeout to probing interval (must be less than 1.0). */
+ private double topologyProbingTimeoutRatio = 0.5;
+
/** The policy of cluster RegionGroups' leader distribution. */
private String leaderDistributionPolicy = AbstractLeaderBalancer.CFD_POLICY;
@@ -1316,4 +1325,28 @@ public class ConfigNodeConfig {
public void setFailureDetectorPhiAcceptablePauseInMs(long
failureDetectorPhiAcceptablePauseInMs) {
this.failureDetectorPhiAcceptablePauseInMs =
failureDetectorPhiAcceptablePauseInMs;
}
+
+ public boolean isEnableTopologyProbing() {
+ return enableTopologyProbing;
+ }
+
+ public void setEnableTopologyProbing(boolean enableTopologyProbing) {
+ this.enableTopologyProbing = enableTopologyProbing;
+ }
+
+ public long getTopologyProbingBaseIntervalInMs() {
+ return topologyProbingBaseIntervalInMs;
+ }
+
+ public void setTopologyProbingBaseIntervalInMs(long
topologyProbingBaseIntervalInMs) {
+ this.topologyProbingBaseIntervalInMs = topologyProbingBaseIntervalInMs;
+ }
+
+ public double getTopologyProbingTimeoutRatio() {
+ return topologyProbingTimeoutRatio;
+ }
+
+ public void setTopologyProbingTimeoutRatio(double
topologyProbingTimeoutRatio) {
+ this.topologyProbingTimeoutRatio = topologyProbingTimeoutRatio;
+ }
}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
index fa35565ff51..bce0d4b215a 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
@@ -340,6 +340,35 @@ public class ConfigNodeDescriptor {
"failure_detector_phi_acceptable_pause_in_ms",
String.valueOf(conf.getFailureDetectorPhiAcceptablePauseInMs()))));
+ conf.setEnableTopologyProbing(
+ Boolean.parseBoolean(
+ properties.getProperty(
+ "enable_topology_probing",
String.valueOf(conf.isEnableTopologyProbing()))));
+
+ long topologyProbingBaseIntervalInMs =
+ Long.parseLong(
+ properties.getProperty(
+ "topology_probing_base_interval_in_ms",
+ String.valueOf(conf.getTopologyProbingBaseIntervalInMs())));
+ if (topologyProbingBaseIntervalInMs <= 0) {
+ throw new IOException(
+ "topology_probing_base_interval_in_ms must be positive, but got: "
+ + topologyProbingBaseIntervalInMs);
+ }
+ conf.setTopologyProbingBaseIntervalInMs(topologyProbingBaseIntervalInMs);
+
+ double topologyProbingTimeoutRatio =
+ Double.parseDouble(
+ properties.getProperty(
+ "topology_probing_timeout_ratio",
+ String.valueOf(conf.getTopologyProbingTimeoutRatio())));
+ if (topologyProbingTimeoutRatio <= 0 || topologyProbingTimeoutRatio >=
1.0) {
+ throw new IOException(
+ "topology_probing_timeout_ratio must be in (0, 1), but got: "
+ + topologyProbingTimeoutRatio);
+ }
+ conf.setTopologyProbingTimeoutRatio(topologyProbingTimeoutRatio);
+
String leaderDistributionPolicy =
properties.getProperty("leader_distribution_policy",
conf.getLeaderDistributionPolicy());
if (AbstractLeaderBalancer.GREEDY_POLICY.equals(leaderDistributionPolicy)
@@ -791,6 +820,8 @@ public class ConfigNodeDescriptor {
ConfigurationFileUtils.updateAppliedProperties(properties, true);
Optional.ofNullable(properties.getProperty(IoTDBConstant.CLUSTER_NAME))
.ifPresent(conf::setClusterName);
+ Optional.ofNullable(properties.getProperty("enable_topology_probing"))
+ .ifPresent(v ->
conf.setEnableTopologyProbing(Boolean.parseBoolean(v)));
}
public static ConfigNodeDescriptor getInstance() {
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachine.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachine.java
index effb1c466ba..9c0f64873c4 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachine.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachine.java
@@ -256,6 +256,7 @@ public class ConfigRegionStateMachine implements
IStateMachine, IStateMachine.Ev
configManager.getPipeManager().getPipeRuntimeCoordinator().stopPipeMetaSync();
configManager.getPipeManager().getPipeRuntimeCoordinator().stopPipeHeartbeat();
configManager.getSubscriptionManager().getSubscriptionCoordinator().stopSubscriptionMetaSync();
+ configManager.getLoadManager().stopTopologyService();
configManager.getLoadManager().stopLoadServices();
configManager.getProcedureManager().stopExecutor();
configManager.getRetryFailedTasksThread().stopRetryFailedTasksService();
@@ -288,6 +289,10 @@ public class ConfigRegionStateMachine implements
IStateMachine, IStateMachine.Ev
// Always start load services first
configManager.getLoadManager().startLoadServices();
+ if (CONF.isEnableTopologyProbing()) {
+ configManager.getLoadManager().startTopologyService();
+ }
+
// Start leader scheduling services
configManager.getProcedureManager().startExecutor();
threadPool.submit(
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
index 327f842e966..34533362d44 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
@@ -1744,6 +1744,7 @@ public class ConfigManager implements IManager {
TrimProperties properties = new TrimProperties();
properties.putAll(req.getConfigs());
+ boolean wasTopologyProbingEnabled = CONF.isEnableTopologyProbing();
if (configurationFileFound) {
File file = new File(url.getFile());
try {
@@ -1767,6 +1768,7 @@ public class ConfigManager implements IManager {
}
LOGGER.warn(msg);
}
+ handleTopologyProbingHotReload(wasTopologyProbingEnabled);
if (currentNodeId == req.getNodeId()) {
return tsStatus;
}
@@ -1778,6 +1780,18 @@ public class ConfigManager implements IManager {
return RpcUtils.squashResponseStatusList(statusList);
}
+ private void handleTopologyProbingHotReload(boolean wasEnabled) {
+ boolean isEnabled = CONF.isEnableTopologyProbing();
+ if (wasEnabled == isEnabled) {
+ return;
+ }
+ if (isEnabled && getConsensusManager().isLeader()) {
+ getLoadManager().startTopologyService();
+ } else if (!isEnabled) {
+ getLoadManager().stopTopologyService();
+ }
+ }
+
@Override
public TSStatus startRepairData() {
TSStatus status = confirmLeader();
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
index 993bfc0e400..e97f32bdbda 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
@@ -152,7 +152,6 @@ public class LoadManager {
statisticsService.startLoadStatisticsService();
eventService.startEventService();
partitionBalancer.setupPartitionBalancer();
- topologyService.startTopologyService();
}
public void stopLoadServices() {
@@ -162,6 +161,13 @@ public class LoadManager {
loadCache.clearHeartbeatCache();
partitionBalancer.clearPartitionBalancer();
routeBalancer.clearRegionPriority();
+ }
+
+ public void startTopologyService() {
+ topologyService.startTopologyService();
+ }
+
+ public void stopTopologyService() {
topologyService.stopTopologyService();
}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/LoadCache.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/LoadCache.java
index 07105d96bbd..41a0dbc5c46 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/LoadCache.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/LoadCache.java
@@ -787,7 +787,7 @@ public class LoadCache {
for (int fromId : latestTopology.keySet()) {
for (int toId : latestTopology.keySet()) {
boolean originReachable =
- latestTopology.getOrDefault(fromId,
Collections.emptySet()).contains(toId);
+ topologyGraph.getOrDefault(fromId,
Collections.emptySet()).contains(toId);
boolean newReachable =
latestTopology.getOrDefault(fromId,
Collections.emptySet()).contains(toId);
if (originReachable != newReachable) {
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/HeartbeatService.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/HeartbeatService.java
index 64322da5bbb..100b0cbf384 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/HeartbeatService.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/HeartbeatService.java
@@ -49,7 +49,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
-import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.Future;
@@ -168,13 +167,6 @@ public class HeartbeatService {
heartbeatReq.setSpaceQuotaUsage(configManager.getClusterQuotaManager().getSpaceQuotaUsage());
}
- final Map<Integer, Set<Integer>> topologyMap =
- configManager.getLoadManager().getLoadCache().getTopology();
- if (topologyMap != null) {
- heartbeatReq.setTopology(topologyMap);
-
heartbeatReq.setDataNodes(configManager.getNodeManager().getRegisteredDataNodeLocations());
- }
-
// We broadcast region operations list every 100 heartbeat loops
if (heartbeatCounter.get() % 100 == 0) {
heartbeatReq.setCurrentRegionOperations(
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/TopologyService.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/TopologyService.java
index 9e4f6bd6121..728c2ce7ffc 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/TopologyService.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/TopologyService.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.confignode.manager.load.service;
import org.apache.iotdb.common.rpc.thrift.TDataNodeConfiguration;
import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TNodeLocations;
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.common.rpc.thrift.TTestConnectionResp;
import org.apache.iotdb.common.rpc.thrift.TTestConnectionResult;
import org.apache.iotdb.commons.cluster.NodeStatus;
@@ -41,6 +42,7 @@ import
org.apache.iotdb.confignode.manager.load.cache.node.NodeHeartbeatSample;
import org.apache.iotdb.confignode.manager.load.cache.node.NodeStatistics;
import
org.apache.iotdb.confignode.manager.load.subscriber.IClusterStatusSubscriber;
import
org.apache.iotdb.confignode.manager.load.subscriber.NodeStatisticsChangeEvent;
+import org.apache.iotdb.mpp.rpc.thrift.TUpdateClusterTopologyReq;
import org.apache.ratis.util.AwaitForSignal;
import org.apache.tsfile.utils.Pair;
@@ -49,6 +51,8 @@ import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
@@ -57,7 +61,6 @@ import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
@@ -67,8 +70,6 @@ import java.util.stream.Collectors;
public class TopologyService implements Runnable, IClusterStatusSubscriber {
private static final Logger LOGGER =
LoggerFactory.getLogger(TopologyService.class);
- private static final long PROBING_INTERVAL_MS = 5_000L;
- private static final long PROBING_TIMEOUT_MS = PROBING_INTERVAL_MS;
private static final int SAMPLING_WINDOW_SIZE = 100;
private final ExecutorService topologyThread =
@@ -85,11 +86,18 @@ public class TopologyService implements Runnable,
IClusterStatusSubscriber {
/* (fromDataNodeId, toDataNodeId) -> heartbeat history */
private final Map<Pair<Integer, Integer>, List<AbstractHeartbeatSample>>
heartbeats;
- private final List<Integer> startingDataNodes = new CopyOnWriteArrayList<>();
private final IFailureDetector failureDetector;
private static final ConfigNodeConfig CONF =
ConfigNodeDescriptor.getInstance().getConf();
+ private int proberRotationIndex = 0;
+
+ /** Last topology pushed to each DataNode, updated only on successful push.
*/
+ private final Map<Integer, Set<Integer>> lastPushedTopology = new
ConcurrentHashMap<>();
+
+ /** Latest computed topology, updated each probing round. */
+ private final Map<Integer, Set<Integer>> latestTopology = new
ConcurrentHashMap<>();
+
public TopologyService(
IManager configManager, Consumer<Map<Integer, Set<Integer>>>
topologyChangeListener) {
this.configManager = configManager;
@@ -98,7 +106,6 @@ public class TopologyService implements Runnable,
IClusterStatusSubscriber {
this.shouldRun = new AtomicBoolean(false);
this.awaitForSignal = new AwaitForSignal(this.getClass().getSimpleName());
- // here we use the same failure
switch (CONF.getFailureDetector()) {
case IFailureDetector.PHI_ACCRUAL_DETECTOR:
this.failureDetector =
@@ -126,22 +133,20 @@ public class TopologyService implements Runnable,
IClusterStatusSubscriber {
public synchronized void stopTopologyService() {
shouldRun.set(false);
- future.cancel(true);
- future = null;
+ if (future != null) {
+ future.cancel(true);
+ future = null;
+ }
heartbeats.clear();
+ latestTopology.clear();
LOGGER.info("Topology Probing has stopped successfully");
}
- /**
- * Schedule the {@link #topologyProbing} task either: 1. every
PROBING_INTERVAL_MS interval. 2.
- * Manually triggered by outside events (node restart / register, etc.).
- */
private boolean mayWait() {
try {
- this.awaitForSignal.await(PROBING_INTERVAL_MS, TimeUnit.MILLISECONDS);
+ this.awaitForSignal.await(CONF.getTopologyProbingBaseIntervalInMs(),
TimeUnit.MILLISECONDS);
return true;
} catch (InterruptedException e) {
- // we don't reset the interrupt flag here since we may reuse this thread
again.
return false;
}
}
@@ -153,36 +158,70 @@ public class TopologyService implements Runnable,
IClusterStatusSubscriber {
}
}
+ /**
+ * Select sqrt(N) DataNodes as probers, rotating through all DataNodes
across cycles so that every
+ * DataNode gets to be a prober over sqrt(N) cycles.
+ */
+ private List<TDataNodeLocation> selectProbers(List<TDataNodeLocation>
allDataNodes) {
+ int n = allDataNodes.size();
+ if (n <= 1) {
+ return allDataNodes;
+ }
+ int sqrtN = (int) Math.ceil(Math.sqrt(n));
+ List<TDataNodeLocation> sorted = new ArrayList<>(allDataNodes);
+ sorted.sort(Comparator.comparingInt(TDataNodeLocation::getDataNodeId));
+ int startIndex = (proberRotationIndex * sqrtN) % n;
+ proberRotationIndex++;
+ List<TDataNodeLocation> probers = new ArrayList<>(sqrtN);
+ for (int i = 0; i < sqrtN && i < n; i++) {
+ probers.add(sorted.get((startIndex + i) % n));
+ }
+ return probers;
+ }
+
private synchronized void topologyProbing() {
- // 1. get the latest datanode list
+ // 1. get Running DataNodes only
final List<TDataNodeLocation> dataNodeLocations = new ArrayList<>();
final Set<Integer> dataNodeIds = new HashSet<>();
for (final TDataNodeConfiguration dataNodeConf :
configManager.getNodeManager().getRegisteredDataNodes()) {
final TDataNodeLocation location = dataNodeConf.getLocation();
- if (startingDataNodes.contains(location.getDataNodeId())) {
- continue; // we shall wait for internal endpoint to be ready
+ if
(configManager.getLoadManager().getNodeStatus(location.getDataNodeId())
+ != NodeStatus.Running) {
+ continue;
}
dataNodeLocations.add(location);
dataNodeIds.add(location.getDataNodeId());
}
- // 2. send the verify connection RPC to all datanodes
+ // 2. compute probing timeout
+ final long timeout =
+ (long) (CONF.getTopologyProbingBaseIntervalInMs() *
CONF.getTopologyProbingTimeoutRatio());
+
+ // 3. select sqrt(N) probers via rotating selection
+ final List<TDataNodeLocation> probers = selectProbers(dataNodeLocations);
+ final Set<Integer> proberIds =
+
probers.stream().map(TDataNodeLocation::getDataNodeId).collect(Collectors.toSet());
+
+ // 4. build TNodeLocations with ALL DataNode locations (so probers test
all targets)
final TNodeLocations nodeLocations = new TNodeLocations();
nodeLocations.setDataNodeLocations(dataNodeLocations);
nodeLocations.setConfigNodeLocations(Collections.emptyList());
- final Map<Integer, TDataNodeLocation> dataNodeLocationMap =
- configManager.getNodeManager().getRegisteredDataNodes().stream()
- .map(TDataNodeConfiguration::getLocation)
+
+ // 5. build proberLocationMap containing only the selected probers
+ final Map<Integer, TDataNodeLocation> proberLocationMap =
+ probers.stream()
.collect(Collectors.toMap(TDataNodeLocation::getDataNodeId,
location -> location));
+
+ // 6. send async requests ONLY to probers with computed timeout
final DataNodeAsyncRequestContext<TNodeLocations, TTestConnectionResp>
dataNodeAsyncRequestContext =
new DataNodeAsyncRequestContext<>(
CnToDnAsyncRequestType.SUBMIT_TEST_DN_INTERNAL_CONNECTION_TASK,
nodeLocations,
- dataNodeLocationMap);
+ proberLocationMap);
CnToDnInternalServiceAsyncRequestManager.getInstance()
- .sendAsyncRequestWithTimeoutInMs(dataNodeAsyncRequestContext,
PROBING_TIMEOUT_MS);
+ .sendAsyncRequestWithTimeoutInMs(dataNodeAsyncRequestContext, timeout);
final List<TTestConnectionResult> results = new ArrayList<>();
dataNodeAsyncRequestContext
.getResponseMap()
@@ -193,7 +232,7 @@ public class TopologyService implements Runnable,
IClusterStatusSubscriber {
}
});
- // 3. collect results and update the heartbeat timestamps
+ // 7. collect results and update the heartbeat timestamps
for (final TTestConnectionResult result : results) {
final int fromDataNodeId =
Optional.ofNullable(result.getSender().getDataNodeLocation())
@@ -203,8 +242,6 @@ public class TopologyService implements Runnable,
IClusterStatusSubscriber {
if (result.isSuccess()
&& dataNodeIds.contains(fromDataNodeId)
&& dataNodeIds.contains(toDataNodeId)) {
- // testAllDataNodeConnectionWithTimeout ensures the heartbeats are
Dn-Dn internally. Here we
- // just double-check.
final List<AbstractHeartbeatSample> heartbeatHistory =
heartbeats.computeIfAbsent(
new Pair<>(fromDataNodeId, toDataNodeId), p -> new
LinkedList<>());
@@ -215,40 +252,128 @@ public class TopologyService implements Runnable,
IClusterStatusSubscriber {
}
}
- // 4. use failure detector to identify potential network partitions
- final Map<Integer, Set<Integer>> latestTopology =
- dataNodeLocations.stream()
- .collect(Collectors.toMap(TDataNodeLocation::getDataNodeId, k ->
new HashSet<>()));
+ // 8. build topology: for non-probers carry forward previous results
(default all-connected),
+ // for probers run failure detector
+ final Map<Integer, Set<Integer>> computedTopology = new HashMap<>();
+ for (int nodeId : dataNodeIds) {
+ if (proberIds.contains(nodeId)) {
+ computedTopology.put(nodeId, new HashSet<>());
+ } else {
+ Set<Integer> prev = latestTopology.get(nodeId);
+ if (prev != null) {
+ Set<Integer> carried = new HashSet<>(prev);
+ carried.retainAll(dataNodeIds);
+ computedTopology.put(nodeId, carried);
+ } else {
+ computedTopology.put(nodeId, new HashSet<>(dataNodeIds));
+ }
+ }
+ }
+
for (final Map.Entry<Pair<Integer, Integer>,
List<AbstractHeartbeatSample>> entry :
heartbeats.entrySet()) {
final int fromId = entry.getKey().getLeft();
final int toId = entry.getKey().getRight();
+ if (!proberIds.contains(fromId)
+ || !dataNodeIds.contains(fromId)
+ || !dataNodeIds.contains(toId)) {
+ continue;
+ }
+
if (!entry.getValue().isEmpty()
&& !failureDetector.isAvailable(entry.getKey(), entry.getValue())) {
LOGGER.debug("Connection from DataNode {} to DataNode {} is broken",
fromId, toId);
} else {
- Optional.ofNullable(latestTopology.get(fromId)).ifPresent(s ->
s.add(toId));
+ computedTopology.get(fromId).add(toId);
+ }
+ }
+
+ // For prober nodes: pairs with no heartbeat history default to connected
+ for (int fromId : proberIds) {
+ if (!dataNodeIds.contains(fromId)) {
+ continue;
+ }
+ Set<Integer> reachableSet = computedTopology.get(fromId);
+ for (int toId : dataNodeIds) {
+ if (!heartbeats.containsKey(new Pair<>(fromId, toId))) {
+ reachableSet.add(toId);
+ }
}
}
- logAsymmetricPartition(latestTopology);
+ // Save computed topology for next round
+ latestTopology.clear();
+ for (Map.Entry<Integer, Set<Integer>> entry : computedTopology.entrySet())
{
+ latestTopology.put(entry.getKey(), new HashSet<>(entry.getValue()));
+ }
+
+ logAsymmetricPartition(computedTopology);
- // 5. notify the listeners on topology change
+ // 9. notify the listeners on topology change
if (shouldRun.get()) {
- topologyChangeListener.accept(latestTopology);
+ topologyChangeListener.accept(computedTopology);
}
+
+ // 10. push topology changes to DataNodes
+ pushTopologyToDataNodes(computedTopology, dataNodeLocations);
}
/**
- * We only consider warning (one vs remaining) network partition. If we need
to cover more
- * complicated scenarios like (many vs many) network partition, we shall use
graph algorithms
- * then.
+ * Push topology changes to DataNodes via PUSH_TOPOLOGY request. Each
DataNode receives only its
+ * own reachable set. lastPushedTopology is updated only on successful push.
*/
+ private void pushTopologyToDataNodes(
+ Map<Integer, Set<Integer>> computedTopology, List<TDataNodeLocation>
dataNodeLocations) {
+ final Map<Integer, TDataNodeLocation> dataNodesMap =
+ dataNodeLocations.stream()
+ .collect(Collectors.toMap(TDataNodeLocation::getDataNodeId, loc ->
loc));
+
+ final Map<Integer, TDataNodeLocation> targetMap = new HashMap<>();
+ for (final TDataNodeLocation location : dataNodeLocations) {
+ final int nodeId = location.getDataNodeId();
+ final Set<Integer> reachableSet =
+ computedTopology.getOrDefault(nodeId, Collections.emptySet());
+ final Set<Integer> lastPushed = lastPushedTopology.get(nodeId);
+ if (lastPushed != null && lastPushed.equals(reachableSet)) {
+ continue;
+ }
+ targetMap.put(nodeId, location);
+ }
+
+ if (targetMap.isEmpty()) {
+ return;
+ }
+
+ final DataNodeAsyncRequestContext<TUpdateClusterTopologyReq, TSStatus>
context =
+ new
DataNodeAsyncRequestContext<>(CnToDnAsyncRequestType.PUSH_TOPOLOGY, targetMap);
+ for (final Map.Entry<Integer, TDataNodeLocation> entry :
targetMap.entrySet()) {
+ final int nodeId = entry.getKey();
+ final Set<Integer> reachableSet =
+ computedTopology.getOrDefault(nodeId, Collections.emptySet());
+ final Map<Integer, Set<Integer>> perNodeTopology = new HashMap<>();
+ perNodeTopology.put(nodeId, new HashSet<>(reachableSet));
+ final TUpdateClusterTopologyReq req =
+ new TUpdateClusterTopologyReq(dataNodesMap, perNodeTopology);
+ context.putRequest(nodeId, req);
+ }
+
+ CnToDnInternalServiceAsyncRequestManager.getInstance()
+ .sendAsyncRequestWithTimeoutInMs(context,
CONF.getTopologyProbingBaseIntervalInMs());
+
+ context
+ .getResponseMap()
+ .forEach(
+ (nodeId, resp) -> {
+ Set<Integer> reachableSet =
+ computedTopology.getOrDefault(nodeId,
Collections.emptySet());
+ lastPushedTopology.put(nodeId, new HashSet<>(reachableSet));
+ });
+ }
+
private void logAsymmetricPartition(final Map<Integer, Set<Integer>>
topology) {
final Set<Integer> nodes = topology.keySet();
if (nodes.size() == 1) {
- // 1 DataNode
return;
}
@@ -258,11 +383,9 @@ public class TopologyService implements Runnable,
IClusterStatusSubscriber {
continue;
}
- // whether we have asymmetric partition [from -> to]
final Set<Integer> reachableFrom = topology.get(from);
final Set<Integer> reachableTo = topology.get(to);
if (reachableFrom.size() <= 1 || reachableTo.size() <= 1) {
- // symmetric partition for (from) or (to)
continue;
}
if (!reachableTo.contains(from) && !reachableFrom.contains(to)) {
@@ -272,47 +395,22 @@ public class TopologyService implements Runnable,
IClusterStatusSubscriber {
}
}
- /** We only listen to datanode remove / restart / register events */
+ /** Clean up heartbeat and push state when a node is removed or entering
Removing status. */
@Override
public void onNodeStatisticsChanged(NodeStatisticsChangeEvent event) {
- final Set<Integer> datanodeIds =
-
configManager.getNodeManager().getRegisteredDataNodeLocations().keySet();
- final Map<Integer, Pair<NodeStatistics, NodeStatistics>> changes =
- event.getDifferentNodeStatisticsMap();
for (final Map.Entry<Integer, Pair<NodeStatistics, NodeStatistics>> entry :
- changes.entrySet()) {
+ event.getDifferentNodeStatisticsMap().entrySet()) {
final Integer nodeId = entry.getKey();
- final Pair<NodeStatistics, NodeStatistics> changeEvent =
entry.getValue();
- if (!datanodeIds.contains(nodeId)) {
- continue;
- }
- if (changeEvent.getLeft() == null) {
- // if a new datanode registered, DO NOT trigger probing immediately
- startingDataNodes.add(nodeId);
- continue;
- } else {
- startingDataNodes.remove(nodeId);
- }
-
- final Set<Pair<Integer, Integer>> affectedPairs =
- heartbeats.keySet().stream()
- .filter(
- pair ->
- Objects.equals(pair.getLeft(), nodeId)
- || Objects.equals(pair.getRight(), nodeId))
- .collect(Collectors.toSet());
-
- if (changeEvent.getRight() == null) {
- // datanode removed from cluster, clean up probing history
- affectedPairs.forEach(heartbeats::remove);
- } else {
- // we only trigger probing immediately if node comes around from
UNKNOWN to RUNNING
- if (NodeStatus.Unknown.equals(changeEvent.getLeft().getStatus())
- && NodeStatus.Running.equals(changeEvent.getRight().getStatus())) {
- // let's clear the history when a new node comes around
- affectedPairs.forEach(pair -> heartbeats.put(pair, new
ArrayList<>()));
- awaitForSignal.signal();
- }
+ final Pair<NodeStatistics, NodeStatistics> change = entry.getValue();
+ if (change.getRight() == null ||
NodeStatus.Removing.equals(change.getRight().getStatus())) {
+ heartbeats
+ .keySet()
+ .removeIf(
+ pair ->
+ Objects.equals(pair.getLeft(), nodeId)
+ || Objects.equals(pair.getRight(), nodeId));
+ lastPushedTopology.remove(nodeId);
+ latestTopology.remove(nodeId);
}
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
index 84479c9dcd2..436e78906df 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
@@ -52,6 +52,7 @@ import
org.apache.iotdb.commons.client.request.AsyncRequestContext;
import org.apache.iotdb.commons.cluster.NodeStatus;
import org.apache.iotdb.commons.concurrent.Await;
import org.apache.iotdb.commons.concurrent.AwaitTimeoutException;
+import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.commons.concurrent.IoTThreadFactory;
import org.apache.iotdb.commons.concurrent.ThreadName;
import
org.apache.iotdb.commons.concurrent.threadpool.WrappedThreadPoolExecutor;
@@ -319,6 +320,7 @@ import
org.apache.iotdb.mpp.rpc.thrift.TTableDeviceDeletionWithPatternAndFilterR
import org.apache.iotdb.mpp.rpc.thrift.TTableDeviceDeletionWithPatternOrModReq;
import org.apache.iotdb.mpp.rpc.thrift.TTableDeviceInvalidateCacheReq;
import org.apache.iotdb.mpp.rpc.thrift.TTsFilePieceReq;
+import org.apache.iotdb.mpp.rpc.thrift.TUpdateClusterTopologyReq;
import org.apache.iotdb.mpp.rpc.thrift.TUpdateTableReq;
import org.apache.iotdb.mpp.rpc.thrift.TUpdateTemplateReq;
import org.apache.iotdb.mpp.rpc.thrift.TUpdateTriggerLocationReq;
@@ -373,6 +375,7 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
@@ -417,6 +420,14 @@ public class DataNodeInternalRPCServiceImpl implements
IDataNodeRPCService.Iface
private final ClusterTopology clusterTopology =
ClusterTopology.getInstance();
+ private static final long TEST_CONNECTION_TIMEOUT_MS =
+ CommonDescriptor.getInstance().getConfig().getDnConnectionTimeoutInMS();
+
+ private static final ExecutorService TOPOLOGY_PROBING_EXECUTOR =
+ IoTDBThreadPoolFactory.newFixedThreadPool(
+ Math.max(1, Runtime.getRuntime().availableProcessors() / 4),
+ ThreadName.DATANODE_TOPOLOGY_PROBING.getName());
+
private final CommonConfig commonConfig =
CommonDescriptor.getInstance().getConfig();
private final DataNodeContext dataNodeContext;
@@ -2076,9 +2087,28 @@ public class DataNodeInternalRPCServiceImpl implements
IDataNodeRPCService.Iface
@Override
public TTestConnectionResp submitInternalTestConnectionTask(TNodeLocations
nodeLocations)
throws TException {
- return new TTestConnectionResp(
- new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()),
-
testAllDataNodeConnectionInHeartbeatChannel(nodeLocations.getDataNodeLocations()));
+ Future<TTestConnectionResp> future =
+ TOPOLOGY_PROBING_EXECUTOR.submit(
+ () ->
+ new TTestConnectionResp(
+ new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()),
+ testAllDataNodeConnectionInHeartbeatChannel(
+ nodeLocations.getDataNodeLocations())));
+ try {
+ return future.get(TEST_CONNECTION_TIMEOUT_MS, TimeUnit.MILLISECONDS);
+ } catch (TimeoutException e) {
+ return new TTestConnectionResp(
+ new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode())
+ .setMessage("Topology probing timed out after " +
TEST_CONNECTION_TIMEOUT_MS + "ms"),
+ Collections.emptyList());
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new TException(e);
+ } catch (Exception e) {
+ throw new TException(e);
+ } finally {
+ future.cancel(true);
+ }
}
private static <Location, RequestType> List<TTestConnectionResult>
testConnections(
@@ -2105,7 +2135,8 @@ public class DataNodeInternalRPCServiceImpl implements
IDataNodeRPCService.Iface
TServiceType.ConfigNodeInternalService,
DnToCnRequestType.TEST_CONNECTION,
(AsyncRequestContext<Object, TSStatus, DnToCnRequestType,
TConfigNodeLocation> handler) ->
-
DnToCnInternalServiceAsyncRequestManager.getInstance().sendAsyncRequest(handler));
+ DnToCnInternalServiceAsyncRequestManager.getInstance()
+ .sendAsyncRequestWithTimeoutInMs(handler,
TEST_CONNECTION_TIMEOUT_MS));
}
private List<TTestConnectionResult>
testAllDataNodeConnectionInHeartbeatChannel(
@@ -2117,7 +2148,8 @@ public class DataNodeInternalRPCServiceImpl implements
IDataNodeRPCService.Iface
TServiceType.DataNodeInternalService,
DnToDnRequestType.TEST_CONNECTION,
(AsyncRequestContext<Object, TSStatus, DnToDnRequestType,
TDataNodeLocation> handler) ->
-
DataNodeIntraHeartbeatManager.getInstance().sendAsyncRequest(handler, 1, null,
true));
+ DataNodeIntraHeartbeatManager.getInstance()
+ .sendAsyncRequest(handler, 1, TEST_CONNECTION_TIMEOUT_MS,
true));
}
private List<TTestConnectionResult> testAllDataNodeInternalServiceConnection(
@@ -2129,7 +2161,8 @@ public class DataNodeInternalRPCServiceImpl implements
IDataNodeRPCService.Iface
TServiceType.DataNodeInternalService,
DnToDnRequestType.TEST_CONNECTION,
(AsyncRequestContext<Object, TSStatus, DnToDnRequestType,
TDataNodeLocation> handler) ->
-
DnToDnInternalServiceAsyncRequestManager.getInstance().sendAsyncRequest(handler));
+ DnToDnInternalServiceAsyncRequestManager.getInstance()
+ .sendAsyncRequestWithTimeoutInMs(handler,
TEST_CONNECTION_TIMEOUT_MS));
}
private List<TTestConnectionResult> testAllDataNodeMPPServiceConnection(
@@ -2141,7 +2174,8 @@ public class DataNodeInternalRPCServiceImpl implements
IDataNodeRPCService.Iface
TServiceType.DataNodeMPPService,
DnToDnRequestType.TEST_CONNECTION,
(AsyncRequestContext<Object, TSStatus, DnToDnRequestType,
TDataNodeLocation> handler) ->
-
DataNodeMPPServiceAsyncRequestManager.getInstance().sendAsyncRequest(handler));
+ DataNodeMPPServiceAsyncRequestManager.getInstance()
+ .sendAsyncRequestWithTimeoutInMs(handler,
TEST_CONNECTION_TIMEOUT_MS));
}
private List<TTestConnectionResult> testAllDataNodeExternalServiceConnection(
@@ -2153,7 +2187,8 @@ public class DataNodeInternalRPCServiceImpl implements
IDataNodeRPCService.Iface
TServiceType.DataNodeExternalService,
DnToDnRequestType.TEST_CONNECTION,
(AsyncRequestContext<Object, TSStatus, DnToDnRequestType,
TDataNodeLocation> handler) ->
-
DataNodeExternalServiceAsyncRequestManager.getInstance().sendAsyncRequest(handler));
+ DataNodeExternalServiceAsyncRequestManager.getInstance()
+ .sendAsyncRequestWithTimeoutInMs(handler,
TEST_CONNECTION_TIMEOUT_MS));
}
@Override
@@ -2161,6 +2196,12 @@ public class DataNodeInternalRPCServiceImpl implements
IDataNodeRPCService.Iface
return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
}
+ @Override
+ public TSStatus updateClusterTopology(TUpdateClusterTopologyReq req) throws
TException {
+ clusterTopology.updateTopology(req.getDataNodes(), req.getTopology());
+ return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
+ }
+
private PathPatternTree filterPathPatternTree(PathPatternTree patternTree,
String database) {
PathPatternTree filteredPatternTree = new PathPatternTree();
try {
@@ -2255,10 +2296,6 @@ public class DataNodeInternalRPCServiceImpl implements
IDataNodeRPCService.Iface
}
}
- if (req.isSetTopology() && req.isSetDataNodes()) {
- clusterTopology.updateTopology(req.getDataNodes(), req.getTopology());
- }
-
if (req.isSetCurrentRegionOperations()) {
RegionMigrateService.getInstance()
.notifyRegionMigration(
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/ClusterTopology.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/ClusterTopology.java
index 64ea8b8a2ea..2af51aec26b 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/ClusterTopology.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/ClusterTopology.java
@@ -39,32 +39,43 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
+/**
+ * Tracks this DataNode's view of cluster network topology for partition-aware
query planning. Only
+ * stores the set of DataNodes reachable from this node, pushed by
ConfigNode's TopologyService.
+ */
public class ClusterTopology {
private static final Logger LOGGER =
LoggerFactory.getLogger(ClusterTopology.class);
private final Integer myself;
private final AtomicReference<Map<Integer, TDataNodeLocation>> dataNodes;
- private final AtomicReference<Map<Integer, Set<Integer>>> topologyMap;
+
+ /** DataNode IDs reachable from this node. Empty means not yet probed by
TopologyService. */
+ private final AtomicReference<Set<Integer>> myReachableNodes;
+
private final AtomicBoolean isPartitioned = new AtomicBoolean();
public static ClusterTopology getInstance() {
return ClusterTopologyHolder.INSTANCE;
}
+ /** Filters a replica set to only include DataNodes reachable from this
node. */
public TRegionReplicaSet getValidatedReplicaSet(TRegionReplicaSet origin) {
if (!isPartitioned.get() || origin == null) {
return origin;
}
- final Set<Integer> reachableToMyself =
- Collections.unmodifiableSet(topologyMap.get().get(myself));
+ final Set<Integer> reachable = myReachableNodes.get();
+ if (reachable.isEmpty()) {
+ return origin;
+ }
final List<TDataNodeLocation> locations = new ArrayList<>();
for (final TDataNodeLocation location : origin.getDataNodeLocations()) {
- if (reachableToMyself.contains(location.getDataNodeId())) {
+ if (reachable.contains(location.getDataNodeId())) {
locations.add(location);
}
}
return new TRegionReplicaSet(origin.getRegionId(), locations);
}
+ /** Filters region-to-scan mappings, keeping only replicas reachable from
this node. */
public <T> Set<Map.Entry<TRegionReplicaSet, T>> filterReachableCandidates(
Set<Map.Entry<TRegionReplicaSet, T>> input) {
if (!isPartitioned.get()) {
@@ -87,6 +98,9 @@ public class ClusterTopology {
final TRegionReplicaSet replicaSet = newMap.get(gid);
if (replicaSet != null) {
candidateMap.put(replicaSet, entry.getValue());
+ } else {
+ // Topology not yet available for this region — return original to
avoid silent data loss
+ candidateMap.put(entry.getKey(), entry.getValue());
}
}
return candidateMap.entrySet();
@@ -97,82 +111,61 @@ public class ClusterTopology {
return all;
}
for (TRegionReplicaSet replicaSet : all) {
- // some TRegionReplicaSet is unreachable since all DataNodes are down
if (replicaSet.getDataNodeLocationsSize() == 0) {
throw new ReplicaSetUnreachableException(replicaSet);
}
}
- final Map<Integer, Set<Integer>> topologyMapCurrent =
- Collections.unmodifiableMap(this.topologyMap.get());
-
- // brute-force search to select DataNode candidates that can communicate
to all
- // TRegionReplicaSets
- final List<Integer> dataNodeCandidates = new ArrayList<>();
- for (final Integer datanode : topologyMapCurrent.keySet()) {
- boolean reachableToAllSets = true;
- final Set<Integer> datanodeReachableToThis =
topologyMapCurrent.get(datanode);
- for (final TRegionReplicaSet replicaSet : all) {
- final List<Integer> replicaNodeLocations =
- replicaSet.getDataNodeLocations().stream()
- .map(TDataNodeLocation::getDataNodeId)
- .collect(Collectors.toList());
- replicaNodeLocations.retainAll(datanodeReachableToThis);
- reachableToAllSets = !replicaNodeLocations.isEmpty();
- }
- if (reachableToAllSets) {
- dataNodeCandidates.add(datanode);
- }
+
+ final Set<Integer> reachable = this.myReachableNodes.get();
+ if (reachable.isEmpty()) {
+ return all;
}
+ final Map<Integer, TDataNodeLocation> dataNodesCurrent =
this.dataNodes.get();
- // select TRegionReplicaSet candidates whose DataNode Locations contain at
least one
- // allReachableDataNodes
final List<TRegionReplicaSet> reachableSetCandidates = new ArrayList<>();
for (final TRegionReplicaSet replicaSet : all) {
- final List<Integer> commonLocations =
+ final List<TDataNodeLocation> validLocations =
replicaSet.getDataNodeLocations().stream()
- .map(TDataNodeLocation::getDataNodeId)
+ .filter(loc -> reachable.contains(loc.getDataNodeId()))
+ .map(loc -> dataNodesCurrent.getOrDefault(loc.getDataNodeId(),
loc))
.collect(Collectors.toList());
- commonLocations.retainAll(dataNodeCandidates);
- if (!commonLocations.isEmpty()) {
- final List<TDataNodeLocation> validLocations =
-
commonLocations.stream().map(dataNodes.get()::get).collect(Collectors.toList());
- final TRegionReplicaSet validCandidate =
- new TRegionReplicaSet(replicaSet.getRegionId(), validLocations);
- reachableSetCandidates.add(validCandidate);
+ if (!validLocations.isEmpty()) {
+ reachableSetCandidates.add(new
TRegionReplicaSet(replicaSet.getRegionId(), validLocations));
+ } else {
+ // No reachable replica — return full set so upper layer can handle or
report the error
+ reachableSetCandidates.add(replicaSet);
}
}
-
return reachableSetCandidates;
}
+ /** Called by ConfigNode's TopologyService push. Updates this node's
reachable set. */
public void updateTopology(
final Map<Integer, TDataNodeLocation> dataNodes, Map<Integer,
Set<Integer>> latestTopology) {
- if (!latestTopology.equals(topologyMap.get())) {
- LOGGER.info("[Topology] latest view from config-node: {}",
latestTopology);
- for (int fromId : dataNodes.keySet()) {
- for (int toId : dataNodes.keySet()) {
- boolean originReachable =
- latestTopology.getOrDefault(fromId,
Collections.emptySet()).contains(toId);
- boolean newReachable =
- latestTopology.getOrDefault(fromId,
Collections.emptySet()).contains(toId);
- if (originReachable != newReachable) {
- LOGGER.info(
- "[Topology] Topology of DataNode {} is now {} to DataNode {}",
- fromId,
- newReachable ? "reachable" : "unreachable",
- toId);
- }
+ final Set<Integer> newReachable = latestTopology.getOrDefault(myself,
Collections.emptySet());
+ final Set<Integer> oldReachable = this.myReachableNodes.get();
+
+ if (!newReachable.equals(oldReachable)) {
+ LOGGER.info(
+ "[Topology] latest view from config-node for myself({}): {}",
myself, newReachable);
+ for (int toId : dataNodes.keySet()) {
+ boolean wasReachable = oldReachable.contains(toId);
+ boolean nowReachable = newReachable.contains(toId);
+ if (wasReachable != nowReachable) {
+ LOGGER.info(
+ "[Topology] DataNode {} is now {} to myself({})",
+ toId,
+ nowReachable ? "reachable" : "unreachable",
+ myself);
}
}
- this.topologyMap.set(latestTopology);
+ this.myReachableNodes.set(newReachable);
}
this.dataNodes.set(dataNodes);
- if (latestTopology.get(myself) == null ||
latestTopology.get(myself).isEmpty()) {
- // latest topology doesn't include this node information.
- // This mostly happens when this node just starts and haven't report
connection details.
+ if (newReachable.isEmpty()) {
this.isPartitioned.set(false);
} else {
- this.isPartitioned.set(latestTopology.get(myself).size() !=
latestTopology.size());
+ this.isPartitioned.set(newReachable.size() != dataNodes.size());
}
}
@@ -180,7 +173,7 @@ public class ClusterTopology {
this.myself =
IoTDBDescriptor.getInstance().getConfig().generateLocalDataNodeLocation().getDataNodeId();
this.isPartitioned.set(false);
- this.topologyMap = new AtomicReference<>(Collections.emptyMap());
+ this.myReachableNodes = new AtomicReference<>(Collections.emptySet());
this.dataNodes = new AtomicReference<>(Collections.emptyMap());
}
diff --git
a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template
b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template
index 54d9ccaf5ce..69bcd275250 100644
---
a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template
+++
b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template
@@ -735,6 +735,21 @@ failure_detector_phi_threshold=30
# Datatype: long
failure_detector_phi_acceptable_pause_in_ms=10000
+# Whether to enable topology probing between DataNodes
+# effectiveMode: hot_reload
+# Datatype: Boolean
+# enable_topology_probing=false
+
+# Base interval in ms for topology probing between DataNodes
+# effectiveMode: restart
+# Datatype: long
+# topology_probing_base_interval_in_ms=5000
+
+# Ratio of probing timeout to probing interval (must be less than 1.0)
+# effectiveMode: restart
+# Datatype: double
+# topology_probing_timeout_ratio=0.5
+
# Disk remaining threshold at which DataNode is set to ReadOnly status
# effectiveMode: restart
# Datatype: double(percentage)
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
index b08416bd8c6..4bee7bd1b5b 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
@@ -195,6 +195,7 @@ public enum ThreadName {
INFLUXDB_RPC_PROCESSOR("InfluxdbRPC-Processor"),
STORAGE_ENGINE_CACHED_POOL("StorageEngine"),
DATANODE_SHUTDOWN_HOOK("DataNode-Shutdown-Hook"),
+ DATANODE_TOPOLOGY_PROBING("DataNode-Topology-Probing"),
UPGRADE_TASK("UpgradeThread"),
REGION_MIGRATE("Region-Migrate-Pool"),
STORAGE_ENGINE_RECOVER_TRIGGER("StorageEngine-RecoverTrigger"),
diff --git a/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift
b/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift
index 4323e956a8e..92a7602b34d 100644
--- a/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift
+++ b/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift
@@ -332,6 +332,11 @@ struct TRegionRouteReq {
2: required map<common.TConsensusGroupId, common.TRegionReplicaSet>
regionRouteMap
}
+struct TUpdateClusterTopologyReq {
+ 1: required map<i32, common.TDataNodeLocation> dataNodes
+ 2: required map<i32, set<i32>> topology
+}
+
struct TUpdateTemplateReq {
1: required byte type
2: required binary templateInfo
@@ -1304,6 +1309,12 @@ service IDataNodeRPCService {
/** Empty rpc, only for connection test */
common.TSStatus testConnectionEmptyRPC()
+ /**
+ * Push cluster topology to this DataNode.
+ * Each DataNode receives only its own reachable set.
+ */
+ common.TSStatus updateClusterTopology(1:TUpdateClusterTopologyReq req)
+
/** to write audit log or other events as time series **/
common.TSStatus insertRecord(1:client.TSInsertRecordReq req);