This is an automated email from the ASF dual-hosted git repository. yongzao pushed a commit to branch topology-log in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit d8de996d3f84aed15fdcc9039cb4b42ce3ac8e77 Author: Yongzao <[email protected]> AuthorDate: Tue Jul 29 16:06:08 2025 +0800 finish --- .../confignode/manager/load/cache/LoadCache.java | 21 ++++++++++++-- .../load/cache/detector/PhiAccrualDetector.java | 33 ++++++++++++++-------- .../iotdb/db/queryengine/plan/ClusterTopology.java | 30 +++++++++++--------- .../plan/AbstractFragmentParallelPlanner.java | 2 +- 4 files changed, 58 insertions(+), 28 deletions(-) diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/LoadCache.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/LoadCache.java index 666c676abdf..dfc495ef90d 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/LoadCache.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/LoadCache.java @@ -780,10 +780,25 @@ public class LoadCache { public void updateTopology(Map<Integer, Set<Integer>> latestTopology) { if (!latestTopology.equals(topologyGraph)) { - LOGGER.info("[Topology Service] Cluster topology changed, latest: {}", latestTopology); + LOGGER.info("[Topology] Cluster topology changed, latest: {}", latestTopology); + for (int fromId : latestTopology.keySet()) { + for (int toId : latestTopology.keySet()) { + boolean originReachable = + latestTopology.getOrDefault(fromId, Collections.emptySet()).contains(toId); + boolean newReachable = + latestTopology.getOrDefault(fromId, Collections.emptySet()).contains(toId); + if (originReachable != newReachable) { + LOGGER.info( + "[Topology] Topology of DataNode {} is now {} to DataNode {}", + fromId, + newReachable ? "reachable" : "unreachable", + toId); + } + } + } + topologyGraph = latestTopology; + topologyUpdated.set(true); } - topologyGraph = latestTopology; - topologyUpdated.set(true); } @Nullable diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/detector/PhiAccrualDetector.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/detector/PhiAccrualDetector.java index 3ed78e53c88..22098f75ad0 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/detector/PhiAccrualDetector.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/detector/PhiAccrualDetector.java @@ -82,22 +82,33 @@ public class PhiAccrualDetector implements IFailureDetector { final Boolean previousAvailability = availibilityCache.getIfPresent(id); availibilityCache.put(id, isAvailable); + // log the status change and dump the heartbeat history for analysis use if (Boolean.TRUE.equals(previousAvailability) && !isAvailable) { - // log the status change and dump the heartbeat history for analysis use - final StringBuilder builder = new StringBuilder(); - builder.append("["); - for (double interval : phiAccrual.heartbeatIntervals) { - final long msInterval = (long) interval / 1000_000; - builder.append(msInterval).append(", "); - } - builder.append(phiAccrual.timeElapsedSinceLastHeartbeat / 1000_000); - builder.append("]"); - LOGGER.info(String.format("Node %s Down, heartbeat history (ms): %s", id, builder)); + final StringBuilder builder = buildRecentHeartbeatHistory(phiAccrual); + LOGGER.info( + "[PhiAccrualDetector] Topology {} is broken, heartbeat history (ms): {}", id, builder); + } + if (Boolean.FALSE.equals(previousAvailability) && isAvailable) { + final StringBuilder builder = buildRecentHeartbeatHistory(phiAccrual); + LOGGER.info( + "[PhiAccrualDetector] Topology {} is recovered, heartbeat history (ms): {}", id, builder); } - return isAvailable; } + private StringBuilder buildRecentHeartbeatHistory(PhiAccrual phiAccrual) { + // log the status change and dump the heartbeat history for analysis use + final StringBuilder builder = new StringBuilder(); + builder.append("["); + for (double interval : phiAccrual.heartbeatIntervals) { + final long msInterval = (long) interval / 1000_000; + builder.append(msInterval).append(", "); + } + builder.append(phiAccrual.timeElapsedSinceLastHeartbeat / 1000_000); + builder.append("]"); + return builder; + } + PhiAccrual create(List<AbstractHeartbeatSample> history) { final List<Double> heartbeatIntervals = new ArrayList<>(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/ClusterTopology.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/ClusterTopology.java index 7a277ce0165..64ea8b8a2ea 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/ClusterTopology.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/ClusterTopology.java @@ -32,7 +32,6 @@ import org.slf4j.LoggerFactory; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; -import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; @@ -150,25 +149,30 @@ public class ClusterTopology { final Map<Integer, TDataNodeLocation> dataNodes, Map<Integer, Set<Integer>> latestTopology) { if (!latestTopology.equals(topologyMap.get())) { LOGGER.info("[Topology] latest view from config-node: {}", latestTopology); + for (int fromId : dataNodes.keySet()) { + for (int toId : dataNodes.keySet()) { + boolean originReachable = + latestTopology.getOrDefault(fromId, Collections.emptySet()).contains(toId); + boolean newReachable = + latestTopology.getOrDefault(fromId, Collections.emptySet()).contains(toId); + if (originReachable != newReachable) { + LOGGER.info( + "[Topology] Topology of DataNode {} is now {} to DataNode {}", + fromId, + newReachable ? "reachable" : "unreachable", + toId); + } + } + } + this.topologyMap.set(latestTopology); } this.dataNodes.set(dataNodes); - this.topologyMap.set(latestTopology); if (latestTopology.get(myself) == null || latestTopology.get(myself).isEmpty()) { // latest topology doesn't include this node information. // This mostly happens when this node just starts and haven't report connection details. this.isPartitioned.set(false); } else { - this.isPartitioned.set(latestTopology.get(myself).size() != latestTopology.keySet().size()); - } - if (isPartitioned.get() && LOGGER.isDebugEnabled()) { - final Set<Integer> allDataLocations = new HashSet<>(latestTopology.keySet()); - allDataLocations.removeAll(latestTopology.get(myself)); - final String partitioned = - allDataLocations.stream() - .collect( - StringBuilder::new, (sb, id) -> sb.append(",").append(id), StringBuilder::append) - .toString(); - LOGGER.debug("This DataNode {} is partitioned with [{}]", myself, partitioned); + this.isPartitioned.set(latestTopology.get(myself).size() != latestTopology.size()); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/AbstractFragmentParallelPlanner.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/AbstractFragmentParallelPlanner.java index dbf81451311..84489a630aa 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/AbstractFragmentParallelPlanner.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/AbstractFragmentParallelPlanner.java @@ -71,7 +71,7 @@ public abstract class AbstractFragmentParallelPlanner implements IFragmentParall && !CollectionUtils.isEmpty(regionReplicaSet.getDataNodeLocations())) { regionReplicaSet = validator.apply(regionReplicaSet); if (regionReplicaSet.getDataNodeLocations().isEmpty()) { - throw new ReplicaSetUnreachableException(fragment.getTargetRegionForTreeModel()); + throw new ReplicaSetUnreachableException(replicaSetProvider.get()); } } // Set ExecutorType and target host for the instance
