This is an automated email from the ASF dual-hosted git repository.
tanxinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 9c1eb55dc51 [IOTDB-6208] Node error detection through broken thrift
pipe (#11397)
9c1eb55dc51 is described below
commit 9c1eb55dc5125ecc0ea01e8746f711725379c224
Author: Yongzao <[email protected]>
AuthorDate: Fri Oct 27 18:58:51 2023 +0800
[IOTDB-6208] Node error detection through broken thrift pipe (#11397)
---
.../heartbeat/ConfigNodeHeartbeatHandler.java | 19 ++++++++++++++-----
.../handlers/heartbeat/DataNodeHeartbeatHandler.java | 8 +++++++-
2 files changed, 21 insertions(+), 6 deletions(-)
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/heartbeat/ConfigNodeHeartbeatHandler.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/heartbeat/ConfigNodeHeartbeatHandler.java
index 9d7e11b23b8..69f72df9702 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/heartbeat/ConfigNodeHeartbeatHandler.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/heartbeat/ConfigNodeHeartbeatHandler.java
@@ -19,6 +19,9 @@
package org.apache.iotdb.confignode.client.async.handlers.heartbeat;
+import org.apache.iotdb.commons.client.ThriftClient;
+import org.apache.iotdb.commons.cluster.NodeStatus;
+import org.apache.iotdb.commons.cluster.NodeType;
import org.apache.iotdb.confignode.manager.load.cache.LoadCache;
import org.apache.iotdb.confignode.manager.load.cache.node.NodeHeartbeatSample;
@@ -27,21 +30,27 @@ import org.apache.thrift.async.AsyncMethodCallback;
public class ConfigNodeHeartbeatHandler implements AsyncMethodCallback<Long> {
private final int nodeId;
- private final LoadCache cache;
+ private final LoadCache loadCache;
- public ConfigNodeHeartbeatHandler(int nodeId, LoadCache cache) {
+ public ConfigNodeHeartbeatHandler(int nodeId, LoadCache loadCache) {
this.nodeId = nodeId;
- this.cache = cache;
+ this.loadCache = loadCache;
}
@Override
public void onComplete(Long timestamp) {
long receiveTime = System.currentTimeMillis();
- cache.cacheConfigNodeHeartbeatSample(nodeId, new
NodeHeartbeatSample(timestamp, receiveTime));
+ loadCache.cacheConfigNodeHeartbeatSample(
+ nodeId, new NodeHeartbeatSample(timestamp, receiveTime));
}
@Override
public void onError(Exception e) {
- // Do nothing
+ if (ThriftClient.isConnectionBroken(e)) {
+ loadCache.forceUpdateNodeCache(
+ NodeType.ConfigNode,
+ nodeId,
+ NodeHeartbeatSample.generateDefaultSample(NodeStatus.Unknown));
+ }
}
}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/heartbeat/DataNodeHeartbeatHandler.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/heartbeat/DataNodeHeartbeatHandler.java
index d054478d877..33ae5ca3835 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/heartbeat/DataNodeHeartbeatHandler.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/heartbeat/DataNodeHeartbeatHandler.java
@@ -19,6 +19,9 @@
package org.apache.iotdb.confignode.client.async.handlers.heartbeat;
+import org.apache.iotdb.commons.client.ThriftClient;
+import org.apache.iotdb.commons.cluster.NodeStatus;
+import org.apache.iotdb.commons.cluster.NodeType;
import org.apache.iotdb.commons.cluster.RegionStatus;
import org.apache.iotdb.confignode.manager.load.cache.LoadCache;
import org.apache.iotdb.confignode.manager.load.cache.node.NodeHeartbeatSample;
@@ -121,6 +124,9 @@ public class DataNodeHeartbeatHandler implements
AsyncMethodCallback<THeartbeatR
@Override
public void onError(Exception e) {
- // Do nothing
+ if (ThriftClient.isConnectionBroken(e)) {
+ loadCache.forceUpdateNodeCache(
+ NodeType.DataNode, nodeId,
NodeHeartbeatSample.generateDefaultSample(NodeStatus.Unknown));
+ }
}
}