This is an automated email from the ASF dual-hosted git repository. CRZbulabula pushed a commit to branch upgrade-heartbeat-service in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 2ddad6d48ebe1c580491a8249c2a96102e901052 Author: Yongzao <[email protected]> AuthorDate: Tue May 5 13:58:28 2026 +0800 ready 4 review --- .../iotdb/confignode/conf/ConfigNodeConfig.java | 33 +++++ .../confignode/conf/ConfigNodeDescriptor.java | 18 +++ .../confignode/manager/load/cache/LoadCache.java | 2 +- .../manager/load/service/HeartbeatService.java | 8 +- .../manager/load/service/TopologyService.java | 143 ++++++++++++++++++--- .../db/pipe/agent/task/PipeDataNodeTaskAgent.java | 4 +- .../impl/DataNodeInternalRPCServiceImpl.java | 42 ++++-- .../iotdb/db/queryengine/plan/ClusterTopology.java | 2 +- .../iotdb/commons/client/ClientPoolFactory.java | 6 +- .../iotdb/commons/concurrent/ThreadName.java | 1 + .../apache/iotdb/commons/conf/CommonConfig.java | 10 ++ .../iotdb/commons/conf/CommonDescriptor.java | 6 + 12 files changed, 237 insertions(+), 38 deletions(-) diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java index f305b06398d..306e6cd895f 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java @@ -204,6 +204,15 @@ public class ConfigNodeConfig { /** Acceptable pause duration for Phi accrual failure detector */ private long failureDetectorPhiAcceptablePauseInMs = 10000; + /** Base interval in ms for topology probing. Actual interval scales with DataNode count. */ + private long topologyProbingBaseIntervalInMs = 5000; + + /** Reference DataNode count for adaptive probing interval scaling. */ + private int topologyProbingReferenceNodeCount = 10; + + /** Ratio of probing timeout to probing interval (must be less than 1.0). */ + private double topologyProbingTimeoutRatio = 0.5; + /** The policy of cluster RegionGroups' leader distribution. */ private String leaderDistributionPolicy = AbstractLeaderBalancer.CFD_POLICY; @@ -1288,4 +1297,28 @@ public class ConfigNodeConfig { public void setFailureDetectorPhiAcceptablePauseInMs(long failureDetectorPhiAcceptablePauseInMs) { this.failureDetectorPhiAcceptablePauseInMs = failureDetectorPhiAcceptablePauseInMs; } + + public long getTopologyProbingBaseIntervalInMs() { + return topologyProbingBaseIntervalInMs; + } + + public void setTopologyProbingBaseIntervalInMs(long topologyProbingBaseIntervalInMs) { + this.topologyProbingBaseIntervalInMs = topologyProbingBaseIntervalInMs; + } + + public int getTopologyProbingReferenceNodeCount() { + return topologyProbingReferenceNodeCount; + } + + public void setTopologyProbingReferenceNodeCount(int topologyProbingReferenceNodeCount) { + this.topologyProbingReferenceNodeCount = topologyProbingReferenceNodeCount; + } + + public double getTopologyProbingTimeoutRatio() { + return topologyProbingTimeoutRatio; + } + + public void setTopologyProbingTimeoutRatio(double topologyProbingTimeoutRatio) { + this.topologyProbingTimeoutRatio = topologyProbingTimeoutRatio; + } } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java index 77790dae1a9..506131a583a 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java @@ -322,6 +322,24 @@ public class ConfigNodeDescriptor { "failure_detector_phi_acceptable_pause_in_ms", String.valueOf(conf.getFailureDetectorPhiAcceptablePauseInMs())))); + conf.setTopologyProbingBaseIntervalInMs( + Long.parseLong( + properties.getProperty( + "topology_probing_base_interval_in_ms", + String.valueOf(conf.getTopologyProbingBaseIntervalInMs())))); + + conf.setTopologyProbingReferenceNodeCount( + Integer.parseInt( + properties.getProperty( + "topology_probing_reference_node_count", + String.valueOf(conf.getTopologyProbingReferenceNodeCount())))); + + conf.setTopologyProbingTimeoutRatio( + Double.parseDouble( + properties.getProperty( + "topology_probing_timeout_ratio", + String.valueOf(conf.getTopologyProbingTimeoutRatio())))); + String leaderDistributionPolicy = properties.getProperty("leader_distribution_policy", conf.getLeaderDistributionPolicy()); if (AbstractLeaderBalancer.GREEDY_POLICY.equals(leaderDistributionPolicy) diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/LoadCache.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/LoadCache.java index 07105d96bbd..41a0dbc5c46 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/LoadCache.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/LoadCache.java @@ -787,7 +787,7 @@ public class LoadCache { for (int fromId : latestTopology.keySet()) { for (int toId : latestTopology.keySet()) { boolean originReachable = - latestTopology.getOrDefault(fromId, Collections.emptySet()).contains(toId); + topologyGraph.getOrDefault(fromId, Collections.emptySet()).contains(toId); boolean newReachable = latestTopology.getOrDefault(fromId, Collections.emptySet()).contains(toId); if (originReachable != newReachable) { diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/HeartbeatService.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/HeartbeatService.java index 64322da5bbb..e5f65152c87 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/HeartbeatService.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/HeartbeatService.java @@ -49,7 +49,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.List; -import java.util.Map; import java.util.Optional; import java.util.Set; import java.util.concurrent.Future; @@ -168,12 +167,7 @@ public class HeartbeatService { heartbeatReq.setSpaceQuotaUsage(configManager.getClusterQuotaManager().getSpaceQuotaUsage()); } - final Map<Integer, Set<Integer>> topologyMap = - configManager.getLoadManager().getLoadCache().getTopology(); - if (topologyMap != null) { - heartbeatReq.setTopology(topologyMap); - heartbeatReq.setDataNodes(configManager.getNodeManager().getRegisteredDataNodeLocations()); - } + // Topology is now pushed independently by TopologyService, no longer piggybacked on heartbeat // We broadcast region operations list every 100 heartbeat loops if (heartbeatCounter.get() % 100 == 0) { diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/TopologyService.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/TopologyService.java index 9e4f6bd6121..1fd716fc7fc 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/TopologyService.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/TopologyService.java @@ -21,9 +21,13 @@ package org.apache.iotdb.confignode.manager.load.service; import org.apache.iotdb.common.rpc.thrift.TDataNodeConfiguration; import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation; +import org.apache.iotdb.common.rpc.thrift.TEndPoint; import org.apache.iotdb.common.rpc.thrift.TNodeLocations; import org.apache.iotdb.common.rpc.thrift.TTestConnectionResp; import org.apache.iotdb.common.rpc.thrift.TTestConnectionResult; +import org.apache.iotdb.commons.client.ClientPoolFactory; +import org.apache.iotdb.commons.client.IClientManager; +import org.apache.iotdb.commons.client.async.AsyncDataNodeInternalServiceClient; import org.apache.iotdb.commons.cluster.NodeStatus; import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory; import org.apache.iotdb.commons.concurrent.ThreadName; @@ -41,14 +45,18 @@ import org.apache.iotdb.confignode.manager.load.cache.node.NodeHeartbeatSample; import org.apache.iotdb.confignode.manager.load.cache.node.NodeStatistics; import org.apache.iotdb.confignode.manager.load.subscriber.IClusterStatusSubscriber; import org.apache.iotdb.confignode.manager.load.subscriber.NodeStatisticsChangeEvent; +import org.apache.iotdb.mpp.rpc.thrift.TDataNodeHeartbeatReq; +import org.apache.iotdb.mpp.rpc.thrift.TDataNodeHeartbeatResp; import org.apache.ratis.util.AwaitForSignal; +import org.apache.thrift.async.AsyncMethodCallback; import org.apache.tsfile.utils.Pair; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.ArrayList; import java.util.Collections; +import java.util.Comparator; import java.util.HashSet; import java.util.LinkedList; import java.util.List; @@ -67,8 +75,6 @@ import java.util.stream.Collectors; public class TopologyService implements Runnable, IClusterStatusSubscriber { private static final Logger LOGGER = LoggerFactory.getLogger(TopologyService.class); - private static final long PROBING_INTERVAL_MS = 5_000L; - private static final long PROBING_TIMEOUT_MS = PROBING_INTERVAL_MS; private static final int SAMPLING_WINDOW_SIZE = 100; private final ExecutorService topologyThread = @@ -90,6 +96,16 @@ public class TopologyService implements Runnable, IClusterStatusSubscriber { private final IFailureDetector failureDetector; private static final ConfigNodeConfig CONF = ConfigNodeDescriptor.getInstance().getConf(); + /** Rotation index for sqrt(N) prober selection. */ + private int proberRotationIndex = 0; + + /** Last topology pushed to each DataNode, used to detect changes. */ + private final Map<Integer, Set<Integer>> lastPushedTopology = new ConcurrentHashMap<>(); + + /** Client manager for pushing topology updates to DataNodes via heartbeat RPC. */ + private final IClientManager<TEndPoint, AsyncDataNodeInternalServiceClient> + topologyPushClientManager; + public TopologyService( IManager configManager, Consumer<Map<Integer, Set<Integer>>> topologyChangeListener) { this.configManager = configManager; @@ -97,6 +113,10 @@ public class TopologyService implements Runnable, IClusterStatusSubscriber { this.heartbeats = new ConcurrentHashMap<>(); this.shouldRun = new AtomicBoolean(false); this.awaitForSignal = new AwaitForSignal(this.getClass().getSimpleName()); + this.topologyPushClientManager = + new IClientManager.Factory<TEndPoint, AsyncDataNodeInternalServiceClient>() + .createClientManager( + new ClientPoolFactory.AsyncDataNodeHeartbeatServiceClientPoolFactory()); // here we use the same failure switch (CONF.getFailureDetector()) { @@ -133,12 +153,17 @@ public class TopologyService implements Runnable, IClusterStatusSubscriber { } /** - * Schedule the {@link #topologyProbing} task either: 1. every PROBING_INTERVAL_MS interval. 2. + * Schedule the {@link #topologyProbing} task either: 1. every adaptive probing interval. 2. * Manually triggered by outside events (node restart / register, etc.). */ private boolean mayWait() { try { - this.awaitForSignal.await(PROBING_INTERVAL_MS, TimeUnit.MILLISECONDS); + long baseInterval = CONF.getTopologyProbingBaseIntervalInMs(); + int dataNodeCount = + configManager.getNodeManager().getRegisteredDataNodes().size() - startingDataNodes.size(); + int referenceNodeCount = CONF.getTopologyProbingReferenceNodeCount(); + long interval = Math.max(baseInterval, baseInterval * dataNodeCount / referenceNodeCount); + this.awaitForSignal.await(interval, TimeUnit.MILLISECONDS); return true; } catch (InterruptedException e) { // we don't reset the interrupt flag here since we may reuse this thread again. @@ -153,8 +178,29 @@ public class TopologyService implements Runnable, IClusterStatusSubscriber { } } + /** + * Select sqrt(N) DataNodes as probers, rotating through all DataNodes across cycles so that every + * DataNode gets to be a prober over sqrt(N) cycles. + */ + private List<TDataNodeLocation> selectProbers(List<TDataNodeLocation> allDataNodes) { + int n = allDataNodes.size(); + if (n <= 1) { + return allDataNodes; + } + int sqrtN = (int) Math.ceil(Math.sqrt(n)); + List<TDataNodeLocation> sorted = new ArrayList<>(allDataNodes); + sorted.sort(Comparator.comparingInt(TDataNodeLocation::getDataNodeId)); + int startIndex = (proberRotationIndex * sqrtN) % n; + proberRotationIndex++; + List<TDataNodeLocation> probers = new ArrayList<>(sqrtN); + for (int i = 0; i < sqrtN && i < n; i++) { + probers.add(sorted.get((startIndex + i) % n)); + } + return probers; + } + private synchronized void topologyProbing() { - // 1. get the latest datanode list + // 1. get the latest datanode list, filter out starting ones final List<TDataNodeLocation> dataNodeLocations = new ArrayList<>(); final Set<Integer> dataNodeIds = new HashSet<>(); for (final TDataNodeConfiguration dataNodeConf : @@ -167,22 +213,36 @@ public class TopologyService implements Runnable, IClusterStatusSubscriber { dataNodeIds.add(location.getDataNodeId()); } - // 2. send the verify connection RPC to all datanodes + // 2. compute adaptive interval and timeout from N = dataNodeLocations.size() + final long baseInterval = CONF.getTopologyProbingBaseIntervalInMs(); + final int referenceNodeCount = CONF.getTopologyProbingReferenceNodeCount(); + final double timeoutRatio = CONF.getTopologyProbingTimeoutRatio(); + final int dataNodeCount = dataNodeLocations.size(); + final long interval = Math.max(baseInterval, baseInterval * dataNodeCount / referenceNodeCount); + final long timeout = (long) (interval * timeoutRatio); + + // 3. select sqrt(N) probers via rotating selection + final List<TDataNodeLocation> probers = selectProbers(dataNodeLocations); + + // 4. build TNodeLocations with ALL DataNode locations (so probers test all targets) final TNodeLocations nodeLocations = new TNodeLocations(); nodeLocations.setDataNodeLocations(dataNodeLocations); nodeLocations.setConfigNodeLocations(Collections.emptyList()); - final Map<Integer, TDataNodeLocation> dataNodeLocationMap = - configManager.getNodeManager().getRegisteredDataNodes().stream() - .map(TDataNodeConfiguration::getLocation) + + // 5. build proberLocationMap containing only the selected probers + final Map<Integer, TDataNodeLocation> proberLocationMap = + probers.stream() .collect(Collectors.toMap(TDataNodeLocation::getDataNodeId, location -> location)); + + // 6. send async requests ONLY to probers (not all DataNodes) with computed timeout final DataNodeAsyncRequestContext<TNodeLocations, TTestConnectionResp> dataNodeAsyncRequestContext = new DataNodeAsyncRequestContext<>( CnToDnAsyncRequestType.SUBMIT_TEST_DN_INTERNAL_CONNECTION_TASK, nodeLocations, - dataNodeLocationMap); + proberLocationMap); CnToDnInternalServiceAsyncRequestManager.getInstance() - .sendAsyncRequestWithTimeoutInMs(dataNodeAsyncRequestContext, PROBING_TIMEOUT_MS); + .sendAsyncRequestWithTimeoutInMs(dataNodeAsyncRequestContext, timeout); final List<TTestConnectionResult> results = new ArrayList<>(); dataNodeAsyncRequestContext .getResponseMap() @@ -193,7 +253,7 @@ public class TopologyService implements Runnable, IClusterStatusSubscriber { } }); - // 3. collect results and update the heartbeat timestamps + // 7. collect results and update the heartbeat timestamps for (final TTestConnectionResult result : results) { final int fromDataNodeId = Optional.ofNullable(result.getSender().getDataNodeLocation()) @@ -203,8 +263,6 @@ public class TopologyService implements Runnable, IClusterStatusSubscriber { if (result.isSuccess() && dataNodeIds.contains(fromDataNodeId) && dataNodeIds.contains(toDataNodeId)) { - // testAllDataNodeConnectionWithTimeout ensures the heartbeats are Dn-Dn internally. Here we - // just double-check. final List<AbstractHeartbeatSample> heartbeatHistory = heartbeats.computeIfAbsent( new Pair<>(fromDataNodeId, toDataNodeId), p -> new LinkedList<>()); @@ -215,7 +273,7 @@ public class TopologyService implements Runnable, IClusterStatusSubscriber { } } - // 4. use failure detector to identify potential network partitions + // 8. use failure detector to identify potential network partitions (on ALL heartbeat pairs) final Map<Integer, Set<Integer>> latestTopology = dataNodeLocations.stream() .collect(Collectors.toMap(TDataNodeLocation::getDataNodeId, k -> new HashSet<>())); @@ -234,10 +292,63 @@ public class TopologyService implements Runnable, IClusterStatusSubscriber { logAsymmetricPartition(latestTopology); - // 5. notify the listeners on topology change + // 9. notify the listeners on topology change if (shouldRun.get()) { topologyChangeListener.accept(latestTopology); } + + // 10. push topology changes to DataNodes + pushTopologyToDataNodes(latestTopology, dataNodeLocations); + } + + /** + * Push topology changes to DataNodes via heartbeat RPC. Each DataNode only receives a push when + * its own reachable set has changed since the last push. + */ + private void pushTopologyToDataNodes( + Map<Integer, Set<Integer>> latestTopology, List<TDataNodeLocation> dataNodeLocations) { + // Build dataNodes map once for all pushes + final Map<Integer, TDataNodeLocation> dataNodesMap = + dataNodeLocations.stream() + .collect(Collectors.toMap(TDataNodeLocation::getDataNodeId, loc -> loc)); + + for (final TDataNodeLocation location : dataNodeLocations) { + final int nodeId = location.getDataNodeId(); + final Set<Integer> reachableSet = latestTopology.getOrDefault(nodeId, Collections.emptySet()); + final Set<Integer> lastPushed = lastPushedTopology.get(nodeId); + + if (lastPushed != null && lastPushed.equals(reachableSet)) { + continue; + } + + final TDataNodeHeartbeatReq req = + new TDataNodeHeartbeatReq(System.nanoTime(), false, false, 0L, 0L); + req.setTopology(latestTopology); + req.setDataNodes(dataNodesMap); + + final TEndPoint endPoint = location.getInternalEndPoint(); + try { + topologyPushClientManager + .borrowClient(endPoint) + .getDataNodeHeartBeat( + req, + new AsyncMethodCallback<TDataNodeHeartbeatResp>() { + @Override + public void onComplete(TDataNodeHeartbeatResp response) { + // No-op: topology push is fire-and-forget + } + + @Override + public void onError(Exception exception) { + // No-op: topology push failures are silently ignored + } + }); + lastPushedTopology.put(nodeId, new HashSet<>(reachableSet)); + } catch (Exception e) { + LOGGER.debug( + "Failed to push topology to DataNode {} at {}: {}", nodeId, endPoint, e.getMessage()); + } + } } /** diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java index 031cfd3a62e..bc0a619afd8 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java @@ -24,7 +24,6 @@ import org.apache.iotdb.common.rpc.thrift.TSStatus; import org.apache.iotdb.commons.concurrent.IoTThreadFactory; import org.apache.iotdb.commons.concurrent.ThreadName; import org.apache.iotdb.commons.concurrent.threadpool.WrappedThreadPoolExecutor; -import org.apache.iotdb.commons.conf.CommonDescriptor; import org.apache.iotdb.commons.consensus.DataRegionId; import org.apache.iotdb.commons.consensus.SchemaRegionId; import org.apache.iotdb.commons.consensus.index.ProgressIndex; @@ -377,8 +376,7 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent { ///////////////////////// Heartbeat ///////////////////////// public void collectPipeMetaList(final TDataNodeHeartbeatResp resp) throws TException { - if (!tryReadLockWithTimeOut( - CommonDescriptor.getInstance().getConfig().getDnConnectionTimeoutInMS() * 2L / 3)) { + if (!tryReadLockWithTimeOut(2)) { return; } try { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java index 84479c9dcd2..e889d936ace 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java @@ -52,6 +52,7 @@ import org.apache.iotdb.commons.client.request.AsyncRequestContext; import org.apache.iotdb.commons.cluster.NodeStatus; import org.apache.iotdb.commons.concurrent.Await; import org.apache.iotdb.commons.concurrent.AwaitTimeoutException; +import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory; import org.apache.iotdb.commons.concurrent.IoTThreadFactory; import org.apache.iotdb.commons.concurrent.ThreadName; import org.apache.iotdb.commons.concurrent.threadpool.WrappedThreadPoolExecutor; @@ -417,6 +418,12 @@ public class DataNodeInternalRPCServiceImpl implements IDataNodeRPCService.Iface private final ClusterTopology clusterTopology = ClusterTopology.getInstance(); + private static final long TEST_CONNECTION_TIMEOUT_MS = + CommonDescriptor.getInstance().getConfig().getDnConnectionTimeoutInMS(); + + private static final ExecutorService TOPOLOGY_PROBING_EXECUTOR = + IoTDBThreadPoolFactory.newFixedThreadPool(2, ThreadName.DATANODE_TOPOLOGY_PROBING.getName()); + private final CommonConfig commonConfig = CommonDescriptor.getInstance().getConfig(); private final DataNodeContext dataNodeContext; @@ -2076,9 +2083,23 @@ public class DataNodeInternalRPCServiceImpl implements IDataNodeRPCService.Iface @Override public TTestConnectionResp submitInternalTestConnectionTask(TNodeLocations nodeLocations) throws TException { - return new TTestConnectionResp( - new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()), - testAllDataNodeConnectionInHeartbeatChannel(nodeLocations.getDataNodeLocations())); + try { + Future<TTestConnectionResp> future = + TOPOLOGY_PROBING_EXECUTOR.submit( + () -> + new TTestConnectionResp( + new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()), + testAllDataNodeConnectionInHeartbeatChannel( + nodeLocations.getDataNodeLocations()))); + return future.get(TEST_CONNECTION_TIMEOUT_MS, TimeUnit.MILLISECONDS); + } catch (TimeoutException e) { + return new TTestConnectionResp( + new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode()) + .setMessage("Topology probing timed out after " + TEST_CONNECTION_TIMEOUT_MS + "ms"), + Collections.emptyList()); + } catch (Exception e) { + throw new TException(e); + } } private static <Location, RequestType> List<TTestConnectionResult> testConnections( @@ -2105,7 +2126,8 @@ public class DataNodeInternalRPCServiceImpl implements IDataNodeRPCService.Iface TServiceType.ConfigNodeInternalService, DnToCnRequestType.TEST_CONNECTION, (AsyncRequestContext<Object, TSStatus, DnToCnRequestType, TConfigNodeLocation> handler) -> - DnToCnInternalServiceAsyncRequestManager.getInstance().sendAsyncRequest(handler)); + DnToCnInternalServiceAsyncRequestManager.getInstance() + .sendAsyncRequestWithTimeoutInMs(handler, TEST_CONNECTION_TIMEOUT_MS)); } private List<TTestConnectionResult> testAllDataNodeConnectionInHeartbeatChannel( @@ -2117,7 +2139,8 @@ public class DataNodeInternalRPCServiceImpl implements IDataNodeRPCService.Iface TServiceType.DataNodeInternalService, DnToDnRequestType.TEST_CONNECTION, (AsyncRequestContext<Object, TSStatus, DnToDnRequestType, TDataNodeLocation> handler) -> - DataNodeIntraHeartbeatManager.getInstance().sendAsyncRequest(handler, 1, null, true)); + DataNodeIntraHeartbeatManager.getInstance() + .sendAsyncRequest(handler, 1, TEST_CONNECTION_TIMEOUT_MS, true)); } private List<TTestConnectionResult> testAllDataNodeInternalServiceConnection( @@ -2129,7 +2152,8 @@ public class DataNodeInternalRPCServiceImpl implements IDataNodeRPCService.Iface TServiceType.DataNodeInternalService, DnToDnRequestType.TEST_CONNECTION, (AsyncRequestContext<Object, TSStatus, DnToDnRequestType, TDataNodeLocation> handler) -> - DnToDnInternalServiceAsyncRequestManager.getInstance().sendAsyncRequest(handler)); + DnToDnInternalServiceAsyncRequestManager.getInstance() + .sendAsyncRequestWithTimeoutInMs(handler, TEST_CONNECTION_TIMEOUT_MS)); } private List<TTestConnectionResult> testAllDataNodeMPPServiceConnection( @@ -2141,7 +2165,8 @@ public class DataNodeInternalRPCServiceImpl implements IDataNodeRPCService.Iface TServiceType.DataNodeMPPService, DnToDnRequestType.TEST_CONNECTION, (AsyncRequestContext<Object, TSStatus, DnToDnRequestType, TDataNodeLocation> handler) -> - DataNodeMPPServiceAsyncRequestManager.getInstance().sendAsyncRequest(handler)); + DataNodeMPPServiceAsyncRequestManager.getInstance() + .sendAsyncRequestWithTimeoutInMs(handler, TEST_CONNECTION_TIMEOUT_MS)); } private List<TTestConnectionResult> testAllDataNodeExternalServiceConnection( @@ -2153,7 +2178,8 @@ public class DataNodeInternalRPCServiceImpl implements IDataNodeRPCService.Iface TServiceType.DataNodeExternalService, DnToDnRequestType.TEST_CONNECTION, (AsyncRequestContext<Object, TSStatus, DnToDnRequestType, TDataNodeLocation> handler) -> - DataNodeExternalServiceAsyncRequestManager.getInstance().sendAsyncRequest(handler)); + DataNodeExternalServiceAsyncRequestManager.getInstance() + .sendAsyncRequestWithTimeoutInMs(handler, TEST_CONNECTION_TIMEOUT_MS)); } @Override diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/ClusterTopology.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/ClusterTopology.java index 64ea8b8a2ea..194d9f8c6cd 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/ClusterTopology.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/ClusterTopology.java @@ -152,7 +152,7 @@ public class ClusterTopology { for (int fromId : dataNodes.keySet()) { for (int toId : dataNodes.keySet()) { boolean originReachable = - latestTopology.getOrDefault(fromId, Collections.emptySet()).contains(toId); + this.topologyMap.get().getOrDefault(fromId, Collections.emptySet()).contains(toId); boolean newReachable = latestTopology.getOrDefault(fromId, Collections.emptySet()).contains(toId); if (originReachable != newReachable) { diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientPoolFactory.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientPoolFactory.java index ea20f9c76cc..f9ffca123b5 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientPoolFactory.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientPoolFactory.java @@ -182,7 +182,8 @@ public class ClientPoolFactory { new ThriftClientProperty.Builder() .setConnectionTimeoutMs(conf.getCnConnectionTimeoutInMS()) .setRpcThriftCompressionEnabled(conf.isRpcThriftCompressionEnabled()) - .setSelectorNumOfAsyncClientManager(conf.getSelectorNumOfClientManager()) + .setSelectorNumOfAsyncClientManager( + conf.getHeartbeatSelectorNumOfClientManager()) .setPrintLogWhenEncounterException(false) .build(), ThreadName.ASYNC_CONFIGNODE_HEARTBEAT_CLIENT_POOL.getName()), @@ -207,7 +208,8 @@ public class ClientPoolFactory { new ThriftClientProperty.Builder() .setConnectionTimeoutMs(conf.getCnConnectionTimeoutInMS()) .setRpcThriftCompressionEnabled(conf.isRpcThriftCompressionEnabled()) - .setSelectorNumOfAsyncClientManager(conf.getSelectorNumOfClientManager()) + .setSelectorNumOfAsyncClientManager( + conf.getHeartbeatSelectorNumOfClientManager()) .setPrintLogWhenEncounterException(false) .build(), ThreadName.ASYNC_DATANODE_HEARTBEAT_CLIENT_POOL.getName()), diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java index 20fa9d78d59..ad22c8b20ae 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java @@ -196,6 +196,7 @@ public enum ThreadName { INFLUXDB_RPC_PROCESSOR("InfluxdbRPC-Processor"), STORAGE_ENGINE_CACHED_POOL("StorageEngine"), DATANODE_SHUTDOWN_HOOK("DataNode-Shutdown-Hook"), + DATANODE_TOPOLOGY_PROBING("DataNode-Topology-Probing"), UPGRADE_TASK("UpgradeThread"), REGION_MIGRATE("Region-Migrate-Pool"), STORAGE_ENGINE_RECOVER_TRIGGER("StorageEngine-RecoverTrigger"), diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java index 19313723e55..598c9d5f021 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java @@ -155,6 +155,8 @@ public class CommonConfig { */ private int selectorNumOfClientManager = 1; + private int heartbeatSelectorNumOfClientManager = 4; + /** Whether to use thrift compression. */ private boolean isRpcThriftCompressionEnabled = false; @@ -697,6 +699,14 @@ public class CommonConfig { this.selectorNumOfClientManager = selectorNumOfClientManager; } + public int getHeartbeatSelectorNumOfClientManager() { + return heartbeatSelectorNumOfClientManager; + } + + public void setHeartbeatSelectorNumOfClientManager(int heartbeatSelectorNumOfClientManager) { + this.heartbeatSelectorNumOfClientManager = heartbeatSelectorNumOfClientManager; + } + public boolean isRpcThriftCompressionEnabled() { return isRpcThriftCompressionEnabled; } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java index d392a60bbbd..11db54be34f 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java @@ -178,6 +178,12 @@ public class CommonDescriptor { String.valueOf(config.getSelectorNumOfClientManager())) .trim())); + config.setHeartbeatSelectorNumOfClientManager( + Integer.parseInt( + properties.getProperty( + "heartbeat_selector_num_of_client_manager", + String.valueOf(config.getHeartbeatSelectorNumOfClientManager())))); + config.setMaxClientNumForEachNode( Integer.parseInt( properties
