This is an automated email from the ASF dual-hosted git repository.

yongzao pushed a commit to branch topology-log
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit d8de996d3f84aed15fdcc9039cb4b42ce3ac8e77
Author: Yongzao <[email protected]>
AuthorDate: Tue Jul 29 16:06:08 2025 +0800

    finish
---
 .../confignode/manager/load/cache/LoadCache.java   | 21 ++++++++++++--
 .../load/cache/detector/PhiAccrualDetector.java    | 33 ++++++++++++++--------
 .../iotdb/db/queryengine/plan/ClusterTopology.java | 30 +++++++++++---------
 .../plan/AbstractFragmentParallelPlanner.java      |  2 +-
 4 files changed, 58 insertions(+), 28 deletions(-)

diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/LoadCache.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/LoadCache.java
index 666c676abdf..dfc495ef90d 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/LoadCache.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/LoadCache.java
@@ -780,10 +780,25 @@ public class LoadCache {
 
   public void updateTopology(Map<Integer, Set<Integer>> latestTopology) {
     if (!latestTopology.equals(topologyGraph)) {
-      LOGGER.info("[Topology Service] Cluster topology changed, latest: {}", 
latestTopology);
+      LOGGER.info("[Topology] Cluster topology changed, latest: {}", 
latestTopology);
+      for (int fromId : latestTopology.keySet()) {
+        for (int toId : latestTopology.keySet()) {
+          boolean originReachable =
+              latestTopology.getOrDefault(fromId, 
Collections.emptySet()).contains(toId);
+          boolean newReachable =
+              latestTopology.getOrDefault(fromId, 
Collections.emptySet()).contains(toId);
+          if (originReachable != newReachable) {
+            LOGGER.info(
+                "[Topology] Topology of DataNode {} is now {} to DataNode {}",
+                fromId,
+                newReachable ? "reachable" : "unreachable",
+                toId);
+          }
+        }
+      }
+      topologyGraph = latestTopology;
+      topologyUpdated.set(true);
     }
-    topologyGraph = latestTopology;
-    topologyUpdated.set(true);
   }
 
   @Nullable
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/detector/PhiAccrualDetector.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/detector/PhiAccrualDetector.java
index 3ed78e53c88..22098f75ad0 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/detector/PhiAccrualDetector.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/detector/PhiAccrualDetector.java
@@ -82,22 +82,33 @@ public class PhiAccrualDetector implements IFailureDetector 
{
     final Boolean previousAvailability = availibilityCache.getIfPresent(id);
     availibilityCache.put(id, isAvailable);
 
+    // log the status change and dump the heartbeat history for analysis use
     if (Boolean.TRUE.equals(previousAvailability) && !isAvailable) {
-      // log the status change and dump the heartbeat history for analysis use
-      final StringBuilder builder = new StringBuilder();
-      builder.append("[");
-      for (double interval : phiAccrual.heartbeatIntervals) {
-        final long msInterval = (long) interval / 1000_000;
-        builder.append(msInterval).append(", ");
-      }
-      builder.append(phiAccrual.timeElapsedSinceLastHeartbeat / 1000_000);
-      builder.append("]");
-      LOGGER.info(String.format("Node %s Down, heartbeat history (ms): %s", 
id, builder));
+      final StringBuilder builder = buildRecentHeartbeatHistory(phiAccrual);
+      LOGGER.info(
+          "[PhiAccrualDetector] Topology {} is broken, heartbeat history (ms): 
{}", id, builder);
+    }
+    if (Boolean.FALSE.equals(previousAvailability) && isAvailable) {
+      final StringBuilder builder = buildRecentHeartbeatHistory(phiAccrual);
+      LOGGER.info(
+          "[PhiAccrualDetector] Topology {} is recovered, heartbeat history 
(ms): {}", id, builder);
     }
-
     return isAvailable;
   }
 
+  private StringBuilder buildRecentHeartbeatHistory(PhiAccrual phiAccrual) {
+    // log the status change and dump the heartbeat history for analysis use
+    final StringBuilder builder = new StringBuilder();
+    builder.append("[");
+    for (double interval : phiAccrual.heartbeatIntervals) {
+      final long msInterval = (long) interval / 1000_000;
+      builder.append(msInterval).append(", ");
+    }
+    builder.append(phiAccrual.timeElapsedSinceLastHeartbeat / 1000_000);
+    builder.append("]");
+    return builder;
+  }
+
   PhiAccrual create(List<AbstractHeartbeatSample> history) {
     final List<Double> heartbeatIntervals = new ArrayList<>();
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/ClusterTopology.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/ClusterTopology.java
index 7a277ce0165..64ea8b8a2ea 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/ClusterTopology.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/ClusterTopology.java
@@ -32,7 +32,6 @@ import org.slf4j.LoggerFactory;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -150,25 +149,30 @@ public class ClusterTopology {
       final Map<Integer, TDataNodeLocation> dataNodes, Map<Integer, 
Set<Integer>> latestTopology) {
     if (!latestTopology.equals(topologyMap.get())) {
       LOGGER.info("[Topology] latest view from config-node: {}", 
latestTopology);
+      for (int fromId : dataNodes.keySet()) {
+        for (int toId : dataNodes.keySet()) {
+          boolean originReachable =
+              latestTopology.getOrDefault(fromId, 
Collections.emptySet()).contains(toId);
+          boolean newReachable =
+              latestTopology.getOrDefault(fromId, 
Collections.emptySet()).contains(toId);
+          if (originReachable != newReachable) {
+            LOGGER.info(
+                "[Topology] Topology of DataNode {} is now {} to DataNode {}",
+                fromId,
+                newReachable ? "reachable" : "unreachable",
+                toId);
+          }
+        }
+      }
+      this.topologyMap.set(latestTopology);
     }
     this.dataNodes.set(dataNodes);
-    this.topologyMap.set(latestTopology);
     if (latestTopology.get(myself) == null || 
latestTopology.get(myself).isEmpty()) {
       // latest topology doesn't include this node information.
       // This mostly happens when this node just starts and haven't report 
connection details.
       this.isPartitioned.set(false);
     } else {
-      this.isPartitioned.set(latestTopology.get(myself).size() != 
latestTopology.keySet().size());
-    }
-    if (isPartitioned.get() && LOGGER.isDebugEnabled()) {
-      final Set<Integer> allDataLocations = new 
HashSet<>(latestTopology.keySet());
-      allDataLocations.removeAll(latestTopology.get(myself));
-      final String partitioned =
-          allDataLocations.stream()
-              .collect(
-                  StringBuilder::new, (sb, id) -> sb.append(",").append(id), 
StringBuilder::append)
-              .toString();
-      LOGGER.debug("This DataNode {} is partitioned with [{}]", myself, 
partitioned);
+      this.isPartitioned.set(latestTopology.get(myself).size() != 
latestTopology.size());
     }
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/AbstractFragmentParallelPlanner.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/AbstractFragmentParallelPlanner.java
index dbf81451311..84489a630aa 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/AbstractFragmentParallelPlanner.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/AbstractFragmentParallelPlanner.java
@@ -71,7 +71,7 @@ public abstract class AbstractFragmentParallelPlanner 
implements IFragmentParall
         && !CollectionUtils.isEmpty(regionReplicaSet.getDataNodeLocations())) {
       regionReplicaSet = validator.apply(regionReplicaSet);
       if (regionReplicaSet.getDataNodeLocations().isEmpty()) {
-        throw new 
ReplicaSetUnreachableException(fragment.getTargetRegionForTreeModel());
+        throw new ReplicaSetUnreachableException(replicaSetProvider.get());
       }
     }
     // Set ExecutorType and target host for the instance

Reply via email to