This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 2113b5a171a Address NPE cause by topology service
2113b5a171a is described below
commit 2113b5a171a638fb8f58a7d84543785557a3b6b1
Author: Yongzao <[email protected]>
AuthorDate: Tue Jul 29 20:22:20 2025 +0800
Address NPE cause by topology service
---
.../confignode/manager/load/cache/LoadCache.java | 21 ++++++++++++--
.../load/cache/detector/PhiAccrualDetector.java | 33 ++++++++++++++--------
.../iotdb/db/queryengine/plan/ClusterTopology.java | 30 +++++++++++---------
.../plan/AbstractFragmentParallelPlanner.java | 2 +-
4 files changed, 58 insertions(+), 28 deletions(-)
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/LoadCache.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/LoadCache.java
index 666c676abdf..dfc495ef90d 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/LoadCache.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/LoadCache.java
@@ -780,10 +780,25 @@ public class LoadCache {
public void updateTopology(Map<Integer, Set<Integer>> latestTopology) {
if (!latestTopology.equals(topologyGraph)) {
- LOGGER.info("[Topology Service] Cluster topology changed, latest: {}",
latestTopology);
+ LOGGER.info("[Topology] Cluster topology changed, latest: {}",
latestTopology);
+ for (int fromId : latestTopology.keySet()) {
+ for (int toId : latestTopology.keySet()) {
+ boolean originReachable =
+ latestTopology.getOrDefault(fromId,
Collections.emptySet()).contains(toId);
+ boolean newReachable =
+ latestTopology.getOrDefault(fromId,
Collections.emptySet()).contains(toId);
+ if (originReachable != newReachable) {
+ LOGGER.info(
+ "[Topology] Topology of DataNode {} is now {} to DataNode {}",
+ fromId,
+ newReachable ? "reachable" : "unreachable",
+ toId);
+ }
+ }
+ }
+ topologyGraph = latestTopology;
+ topologyUpdated.set(true);
}
- topologyGraph = latestTopology;
- topologyUpdated.set(true);
}
@Nullable
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/detector/PhiAccrualDetector.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/detector/PhiAccrualDetector.java
index 3ed78e53c88..22098f75ad0 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/detector/PhiAccrualDetector.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/detector/PhiAccrualDetector.java
@@ -82,22 +82,33 @@ public class PhiAccrualDetector implements IFailureDetector
{
final Boolean previousAvailability = availibilityCache.getIfPresent(id);
availibilityCache.put(id, isAvailable);
+ // log the status change and dump the heartbeat history for analysis use
if (Boolean.TRUE.equals(previousAvailability) && !isAvailable) {
- // log the status change and dump the heartbeat history for analysis use
- final StringBuilder builder = new StringBuilder();
- builder.append("[");
- for (double interval : phiAccrual.heartbeatIntervals) {
- final long msInterval = (long) interval / 1000_000;
- builder.append(msInterval).append(", ");
- }
- builder.append(phiAccrual.timeElapsedSinceLastHeartbeat / 1000_000);
- builder.append("]");
- LOGGER.info(String.format("Node %s Down, heartbeat history (ms): %s",
id, builder));
+ final StringBuilder builder = buildRecentHeartbeatHistory(phiAccrual);
+ LOGGER.info(
+ "[PhiAccrualDetector] Topology {} is broken, heartbeat history (ms):
{}", id, builder);
+ }
+ if (Boolean.FALSE.equals(previousAvailability) && isAvailable) {
+ final StringBuilder builder = buildRecentHeartbeatHistory(phiAccrual);
+ LOGGER.info(
+ "[PhiAccrualDetector] Topology {} is recovered, heartbeat history
(ms): {}", id, builder);
}
-
return isAvailable;
}
+ private StringBuilder buildRecentHeartbeatHistory(PhiAccrual phiAccrual) {
+ // log the status change and dump the heartbeat history for analysis use
+ final StringBuilder builder = new StringBuilder();
+ builder.append("[");
+ for (double interval : phiAccrual.heartbeatIntervals) {
+ final long msInterval = (long) interval / 1000_000;
+ builder.append(msInterval).append(", ");
+ }
+ builder.append(phiAccrual.timeElapsedSinceLastHeartbeat / 1000_000);
+ builder.append("]");
+ return builder;
+ }
+
PhiAccrual create(List<AbstractHeartbeatSample> history) {
final List<Double> heartbeatIntervals = new ArrayList<>();
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/ClusterTopology.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/ClusterTopology.java
index 7a277ce0165..64ea8b8a2ea 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/ClusterTopology.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/ClusterTopology.java
@@ -32,7 +32,6 @@ import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -150,25 +149,30 @@ public class ClusterTopology {
final Map<Integer, TDataNodeLocation> dataNodes, Map<Integer,
Set<Integer>> latestTopology) {
if (!latestTopology.equals(topologyMap.get())) {
LOGGER.info("[Topology] latest view from config-node: {}",
latestTopology);
+ for (int fromId : dataNodes.keySet()) {
+ for (int toId : dataNodes.keySet()) {
+ boolean originReachable =
+ latestTopology.getOrDefault(fromId,
Collections.emptySet()).contains(toId);
+ boolean newReachable =
+ latestTopology.getOrDefault(fromId,
Collections.emptySet()).contains(toId);
+ if (originReachable != newReachable) {
+ LOGGER.info(
+ "[Topology] Topology of DataNode {} is now {} to DataNode {}",
+ fromId,
+ newReachable ? "reachable" : "unreachable",
+ toId);
+ }
+ }
+ }
+ this.topologyMap.set(latestTopology);
}
this.dataNodes.set(dataNodes);
- this.topologyMap.set(latestTopology);
if (latestTopology.get(myself) == null ||
latestTopology.get(myself).isEmpty()) {
// latest topology doesn't include this node information.
// This mostly happens when this node just starts and haven't report
connection details.
this.isPartitioned.set(false);
} else {
- this.isPartitioned.set(latestTopology.get(myself).size() !=
latestTopology.keySet().size());
- }
- if (isPartitioned.get() && LOGGER.isDebugEnabled()) {
- final Set<Integer> allDataLocations = new
HashSet<>(latestTopology.keySet());
- allDataLocations.removeAll(latestTopology.get(myself));
- final String partitioned =
- allDataLocations.stream()
- .collect(
- StringBuilder::new, (sb, id) -> sb.append(",").append(id),
StringBuilder::append)
- .toString();
- LOGGER.debug("This DataNode {} is partitioned with [{}]", myself,
partitioned);
+ this.isPartitioned.set(latestTopology.get(myself).size() !=
latestTopology.size());
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/AbstractFragmentParallelPlanner.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/AbstractFragmentParallelPlanner.java
index dbf81451311..84489a630aa 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/AbstractFragmentParallelPlanner.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/AbstractFragmentParallelPlanner.java
@@ -71,7 +71,7 @@ public abstract class AbstractFragmentParallelPlanner
implements IFragmentParall
&& !CollectionUtils.isEmpty(regionReplicaSet.getDataNodeLocations())) {
regionReplicaSet = validator.apply(regionReplicaSet);
if (regionReplicaSet.getDataNodeLocations().isEmpty()) {
- throw new
ReplicaSetUnreachableException(fragment.getTargetRegionForTreeModel());
+ throw new ReplicaSetUnreachableException(replicaSetProvider.get());
}
}
// Set ExecutorType and target host for the instance