This is an automated email from the ASF dual-hosted git repository.
tanxinyu pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/dev/1.3 by this push:
new 9d103de9d15 [RTO/RPO] Add Phi Accrual for Node failure detection
(#14866) (#14987)
9d103de9d15 is described below
commit 9d103de9d15dfbb17d374876bf65bfe49b752143
Author: William Song <[email protected]>
AuthorDate: Fri Feb 28 18:41:54 2025 +0800
[RTO/RPO] Add Phi Accrual for Node failure detection (#14866) (#14987)
* add phi accrual for node failure detection
* fix unit error
* address review issues
* address review issues
* address review issues
* address review issues
* address review issues
---
iotdb-core/confignode/pom.xml | 4 +
.../iotdb/confignode/conf/ConfigNodeConfig.java | 45 +++++
.../confignode/conf/ConfigNodeDescriptor.java | 30 ++++
.../manager/load/cache/AbstractLoadCache.java | 26 ++-
.../manager/load/cache/IFailureDetector.java | 39 +++++
.../manager/load/cache/detector/FixedDetector.java | 58 +++++++
.../load/cache/detector/PhiAccrualDetector.java | 181 +++++++++++++++++++++
.../load/cache/node/AINodeHeartbeatCache.java | 15 +-
.../load/cache/node/ConfigNodeHeartbeatCache.java | 12 +-
.../load/cache/node/DataNodeHeartbeatCache.java | 11 +-
.../manager/load/cache/region/RegionCache.java | 12 +-
.../manager/load/cache/detector/DetectorTest.java | 179 ++++++++++++++++++++
.../conf/iotdb-system.properties.template | 20 +++
13 files changed, 615 insertions(+), 17 deletions(-)
diff --git a/iotdb-core/confignode/pom.xml b/iotdb-core/confignode/pom.xml
index dd30ea1becd..083c87a8531 100644
--- a/iotdb-core/confignode/pom.xml
+++ b/iotdb-core/confignode/pom.xml
@@ -160,6 +160,10 @@
<version>1.3.0</version>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-math3</artifactId>
+ </dependency>
</dependencies>
<build>
<plugins>
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java
index ffb61da07e1..402f27ef5d8 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java
@@ -28,6 +28,7 @@ import org.apache.iotdb.commons.conf.IoTDBConstant;
import org.apache.iotdb.confignode.manager.load.balancer.RegionBalancer;
import
org.apache.iotdb.confignode.manager.load.balancer.router.leader.AbstractLeaderBalancer;
import
org.apache.iotdb.confignode.manager.load.balancer.router.priority.IPriorityBalancer;
+import org.apache.iotdb.confignode.manager.load.cache.IFailureDetector;
import
org.apache.iotdb.confignode.manager.partition.RegionGroupExtensionPolicy;
import org.apache.iotdb.consensus.ConsensusFactory;
import org.apache.iotdb.metrics.config.MetricConfigDescriptor;
@@ -180,6 +181,18 @@ public class ConfigNodeConfig {
/** The heartbeat interval in milliseconds. */
private long heartbeatIntervalInMs = 1000;
+ /** Failure detector implementation */
+ private String failureDetector = IFailureDetector.PHI_ACCRUAL_DETECTOR;
+
+ /** Max heartbeat elapsed time threshold for Fixed failure detector */
+ private long failureDetectorFixedThresholdInMs = 20000;
+
+ /** Max threshold for Phi accrual failure detector */
+ private long failureDetectorPhiThreshold = 30;
+
+ /** Acceptable pause duration for Phi accrual failure detector */
+ private long failureDetectorPhiAcceptablePauseInMs = 10000;
+
/** The unknown DataNode detect interval in milliseconds. */
private long unknownDataNodeDetectInterval = heartbeatIntervalInMs;
@@ -1216,4 +1229,36 @@ public class ConfigNodeConfig {
|| (TConsensusGroupType.DataRegion.equals(regionGroupId.getType())
&&
getDataRegionConsensusProtocolClass().equals(ConsensusFactory.RATIS_CONSENSUS));
}
+
+ public String getFailureDetector() {
+ return failureDetector;
+ }
+
+ public void setFailureDetector(String failureDetector) {
+ this.failureDetector = failureDetector;
+ }
+
+ public long getFailureDetectorFixedThresholdInMs() {
+ return failureDetectorFixedThresholdInMs;
+ }
+
+ public void setFailureDetectorFixedThresholdInMs(long
failureDetectorFixedThresholdInMs) {
+ this.failureDetectorFixedThresholdInMs = failureDetectorFixedThresholdInMs;
+ }
+
+ public long getFailureDetectorPhiThreshold() {
+ return failureDetectorPhiThreshold;
+ }
+
+ public void setFailureDetectorPhiThreshold(long failureDetectorPhiThreshold)
{
+ this.failureDetectorPhiThreshold = failureDetectorPhiThreshold;
+ }
+
+ public long getFailureDetectorPhiAcceptablePauseInMs() {
+ return failureDetectorPhiAcceptablePauseInMs;
+ }
+
+ public void setFailureDetectorPhiAcceptablePauseInMs(long
failureDetectorPhiAcceptablePauseInMs) {
+ this.failureDetectorPhiAcceptablePauseInMs =
failureDetectorPhiAcceptablePauseInMs;
+ }
}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
index 8369998f966..b9efee34e28 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
@@ -30,6 +30,7 @@ import org.apache.iotdb.commons.utils.NodeUrlUtils;
import org.apache.iotdb.confignode.manager.load.balancer.RegionBalancer;
import
org.apache.iotdb.confignode.manager.load.balancer.router.leader.AbstractLeaderBalancer;
import
org.apache.iotdb.confignode.manager.load.balancer.router.priority.IPriorityBalancer;
+import org.apache.iotdb.confignode.manager.load.cache.IFailureDetector;
import
org.apache.iotdb.confignode.manager.partition.RegionGroupExtensionPolicy;
import org.apache.iotdb.metrics.config.MetricConfigDescriptor;
import org.apache.iotdb.metrics.utils.NodeType;
@@ -319,6 +320,35 @@ public class ConfigNodeDescriptor {
"heartbeat_interval_in_ms",
String.valueOf(conf.getHeartbeatIntervalInMs()))
.trim()));
+ String failureDetector = properties.getProperty("failure_detector",
conf.getFailureDetector());
+ if (IFailureDetector.FIXED_DETECTOR.equals(failureDetector)
+ || IFailureDetector.PHI_ACCRUAL_DETECTOR.equals(failureDetector)) {
+ conf.setFailureDetector(failureDetector);
+ } else {
+ throw new IOException(
+ String.format(
+ "Unknown failure_detector: %s, " + "please set to \"fixed\" or
\"phi_accrual\"",
+ failureDetector));
+ }
+
+ conf.setFailureDetectorFixedThresholdInMs(
+ Long.parseLong(
+ properties.getProperty(
+ "failure_detector_fixed_threshold_in_ms",
+ String.valueOf(conf.getFailureDetectorFixedThresholdInMs()))));
+
+ conf.setFailureDetectorPhiThreshold(
+ Long.parseLong(
+ properties.getProperty(
+ "failure_detector_phi_threshold",
+ String.valueOf(conf.getFailureDetectorPhiThreshold()))));
+
+ conf.setFailureDetectorPhiAcceptablePauseInMs(
+ Long.parseLong(
+ properties.getProperty(
+ "failure_detector_phi_acceptable_pause_in_ms",
+
String.valueOf(conf.getFailureDetectorPhiAcceptablePauseInMs()))));
+
String leaderDistributionPolicy =
properties
.getProperty("leader_distribution_policy",
conf.getLeaderDistributionPolicy())
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/AbstractLoadCache.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/AbstractLoadCache.java
index 856b891c540..6355dfdcc82 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/AbstractLoadCache.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/AbstractLoadCache.java
@@ -19,6 +19,11 @@
package org.apache.iotdb.confignode.manager.load.cache;
+import org.apache.iotdb.confignode.conf.ConfigNodeConfig;
+import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
+import org.apache.iotdb.confignode.manager.load.cache.detector.FixedDetector;
+import
org.apache.iotdb.confignode.manager.load.cache.detector.PhiAccrualDetector;
+
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
@@ -32,17 +37,34 @@ public abstract class AbstractLoadCache {
// Max heartbeat cache samples store size
private static final int MAXIMUM_WINDOW_SIZE = 100;
- // The Status will be set to Unknown when the response time of heartbeat is
more than 20s
- protected static final long HEARTBEAT_TIMEOUT_TIME_IN_NS = 20_000_000_000L;
// Caching the recent MAXIMUM_WINDOW_SIZE heartbeat sample
protected final List<AbstractHeartbeatSample> slidingWindow;
// The current statistics calculated by the latest heartbeat sample
protected final AtomicReference<AbstractStatistics> currentStatistics;
+ protected final IFailureDetector failureDetector;
+
+ private static final ConfigNodeConfig CONF =
ConfigNodeDescriptor.getInstance().getConf();
+
protected AbstractLoadCache() {
this.currentStatistics = new AtomicReference<>();
this.slidingWindow = Collections.synchronizedList(new LinkedList<>());
+ switch (CONF.getFailureDetector()) {
+ case IFailureDetector.PHI_ACCRUAL_DETECTOR:
+ this.failureDetector =
+ new PhiAccrualDetector(
+ CONF.getFailureDetectorPhiThreshold(),
+ CONF.getFailureDetectorPhiAcceptablePauseInMs() * 1000_000L,
+ CONF.getHeartbeatIntervalInMs() * 200_000L,
+ 60,
+ new FixedDetector(CONF.getFailureDetectorFixedThresholdInMs()
* 1000_000L));
+ break;
+ case IFailureDetector.FIXED_DETECTOR:
+ default:
+ this.failureDetector =
+ new FixedDetector(CONF.getFailureDetectorFixedThresholdInMs() *
1000_000L);
+ }
}
/**
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/IFailureDetector.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/IFailureDetector.java
new file mode 100644
index 00000000000..be6a8f621ba
--- /dev/null
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/IFailureDetector.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.manager.load.cache;
+
+import java.util.List;
+
+/**
+ * IFailureDetector is the judge for node status (UNKNOWN). {@link
#isAvailable will be called each
+ * fixed interval updating the node status}
+ */
+public interface IFailureDetector {
+ String FIXED_DETECTOR = "fixed";
+ String PHI_ACCRUAL_DETECTOR = "phi_accrual";
+
+ /**
+ * Given the heartbeat history, decide whether this endpoint is still
available
+ *
+ * @param history heartbeat history
+ * @return false if the endpoint is under failure
+ */
+ boolean isAvailable(List<AbstractHeartbeatSample> history);
+}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/detector/FixedDetector.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/detector/FixedDetector.java
new file mode 100644
index 00000000000..35fb4e3f2a1
--- /dev/null
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/detector/FixedDetector.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.manager.load.cache.detector;
+
+import org.apache.iotdb.confignode.manager.load.cache.AbstractHeartbeatSample;
+import org.apache.iotdb.confignode.manager.load.cache.IFailureDetector;
+import org.apache.iotdb.confignode.manager.load.cache.node.NodeHeartbeatSample;
+import
org.apache.iotdb.confignode.manager.load.cache.region.RegionHeartbeatSample;
+
+import org.apache.tsfile.utils.Preconditions;
+
+import java.util.List;
+import java.util.Optional;
+
+/**
+ * FixedDetector will decide a node unknown iff. Time elapsed from last
heartbeat exceeds the
+ * heartbeatTimeoutNs.
+ */
+public class FixedDetector implements IFailureDetector {
+ private final long heartbeatTimeoutNs;
+
+ public FixedDetector(long heartbeatTimeoutNs) {
+ this.heartbeatTimeoutNs = heartbeatTimeoutNs;
+ }
+
+ @Override
+ public boolean isAvailable(List<AbstractHeartbeatSample> history) {
+ final AbstractHeartbeatSample lastSample =
+ history.isEmpty() ? null : history.get(history.size() - 1);
+ if (lastSample != null) {
+ Preconditions.checkArgument(
+ lastSample instanceof NodeHeartbeatSample || lastSample instanceof
RegionHeartbeatSample);
+ }
+ final long lastSendTime =
+ Optional.ofNullable(lastSample)
+ .map(AbstractHeartbeatSample::getSampleLogicalTimestamp)
+ .orElse(0L);
+ final long currentNanoTime = System.nanoTime();
+ return currentNanoTime - lastSendTime <= heartbeatTimeoutNs;
+ }
+}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/detector/PhiAccrualDetector.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/detector/PhiAccrualDetector.java
new file mode 100644
index 00000000000..a4947e8a3b0
--- /dev/null
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/detector/PhiAccrualDetector.java
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.manager.load.cache.detector;
+
+import org.apache.iotdb.confignode.manager.load.cache.AbstractHeartbeatSample;
+import org.apache.iotdb.confignode.manager.load.cache.IFailureDetector;
+import org.apache.iotdb.confignode.manager.load.cache.node.NodeHeartbeatSample;
+import
org.apache.iotdb.confignode.manager.load.cache.region.RegionHeartbeatSample;
+
+import org.apache.commons.math3.stat.descriptive.DescriptiveStatistics;
+import org.apache.tsfile.utils.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * The Phi Failure Detector, proposed by Hayashibara, Naohiro, et al. "The/spl
phi/accrual failure
+ * detector.". It is an accrual approach based on heartbeat history analysis
with dynamic
+ * sensitivity and tunable threshold. It is adaptive with early failure
detection, increased
+ * accuracy and improved system stability.
+ *
+ * <p>Initially, Phi has a cold start period where it will only collect
heartbeat samples and
+ * fallback decision-making to {@link FixedDetector}. After collecting enough
samples, it will start
+ * failure detection using the Phi algo.
+ */
+public class PhiAccrualDetector implements IFailureDetector {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(PhiAccrualDetector.class);
+ private final long threshold;
+ private final long acceptableHeartbeatPauseNs;
+ private final long minHeartbeatStdNs;
+ private final int codeStartSampleCount;
+ private final IFailureDetector fallbackDuringColdStart;
+
+ public PhiAccrualDetector(
+ long threshold,
+ long acceptableHeartbeatPauseNs,
+ long minHeartbeatStdNs,
+ int minimalSampleCount,
+ IFailureDetector fallbackDuringColdStart) {
+ this.threshold = threshold;
+ this.acceptableHeartbeatPauseNs = acceptableHeartbeatPauseNs;
+ this.minHeartbeatStdNs = minHeartbeatStdNs;
+ this.codeStartSampleCount = minimalSampleCount;
+ this.fallbackDuringColdStart = fallbackDuringColdStart;
+ }
+
+ @Override
+ public boolean isAvailable(List<AbstractHeartbeatSample> history) {
+ if (history.size() < codeStartSampleCount) {
+ /* We haven't received enough heartbeat replies.*/
+ return fallbackDuringColdStart.isAvailable(history);
+ }
+ final PhiAccrual phiAccrual = create(history);
+ final boolean isAvailable = phiAccrual.phi() < (double) this.threshold;
+ if (!isAvailable) {
+ // log the status change and dump the heartbeat history for analysis use
+ final StringBuilder builder = new StringBuilder();
+ builder.append("[");
+ for (double interval : phiAccrual.heartbeatIntervals) {
+ final long msInterval = (long) interval / 1000_000;
+ builder.append(msInterval).append(", ");
+ }
+ builder.append(phiAccrual.timeElapsedSinceLastHeartbeat / 1000_000);
+ builder.append("]");
+ LOGGER.info(String.format("Node Down, heartbeat history (ms): %s",
builder));
+ }
+
+ return isAvailable;
+ }
+
+ PhiAccrual create(List<AbstractHeartbeatSample> history) {
+ final List<Double> heartbeatIntervals = new ArrayList<>();
+
+ long lastTs = -1;
+ for (final AbstractHeartbeatSample sample : history) {
+ // ensure getSampleLogicalTimestamp() will return system nano timestamp
+ Preconditions.checkArgument(
+ sample instanceof NodeHeartbeatSample || sample instanceof
RegionHeartbeatSample);
+ if (lastTs == -1) {
+ lastTs = sample.getSampleLogicalTimestamp();
+ continue;
+ }
+ heartbeatIntervals.add((double) sample.getSampleLogicalTimestamp() -
lastTs);
+ lastTs = sample.getSampleLogicalTimestamp();
+ }
+ final long lastHeartbeatTimestamp = history.get(history.size() -
1).getSampleLogicalTimestamp();
+ final long timeElapsedSinceLastHeartbeat = System.nanoTime() -
lastHeartbeatTimestamp;
+
+ final double[] intervalArray =
+ heartbeatIntervals.stream().mapToDouble(Double::doubleValue).toArray();
+ return new PhiAccrual(
+ intervalArray,
+ timeElapsedSinceLastHeartbeat,
+ minHeartbeatStdNs,
+ acceptableHeartbeatPauseNs);
+ }
+
+ /**
+ * The φ Accrual Failure Detector implementation. See <a
+ *
href="https://doc.akka.io/libraries/akka-core/current/typed/failure-detector.html">φ
+ * Accrual</a>
+ */
+ static final class PhiAccrual {
+ /*
+ * All the heartbeat related intervals within this class should be
calculated in unit of nanoseconds
+ */
+ private final double[] heartbeatIntervals;
+ private final long timeElapsedSinceLastHeartbeat;
+ private final long minHeartbeatStd;
+ private final long acceptableHeartbeatPause;
+
+ PhiAccrual(
+ double[] heartbeatIntervals,
+ long timeElapsedSinceLastHeartbeat,
+ long minHeartbeatStd,
+ long acceptableHeartbeatPause) {
+ Preconditions.checkArgument(heartbeatIntervals.length > 0);
+ Preconditions.checkArgument(timeElapsedSinceLastHeartbeat >= 0);
+ this.heartbeatIntervals = heartbeatIntervals;
+ this.timeElapsedSinceLastHeartbeat = timeElapsedSinceLastHeartbeat;
+ this.minHeartbeatStd = minHeartbeatStd;
+ this.acceptableHeartbeatPause = acceptableHeartbeatPause;
+ }
+
+ /**
+ * @return phi value given the heartbeat interval history
+ */
+ double phi() {
+ final DescriptiveStatistics ds = new
DescriptiveStatistics(heartbeatIntervals);
+ double mean = ds.getMean();
+ double std = ds.getStandardDeviation();
+
+ /* ensure the std is valid */
+ std = Math.max(std, minHeartbeatStd);
+
+ /* add tolerance specified by acceptableHeartbeatPause */
+ mean += acceptableHeartbeatPause;
+
+ return p(timeElapsedSinceLastHeartbeat, mean, std);
+ }
+
+ /**
+ * Core method for calculating the phi φ coefficient. It uses a logistic
approximation to the
+ * cumulative normal distribution.
+ *
+ * @param elapsedTime the difference of the times (current - last
heartbeat timestamp)
+ * @param historyMean the mean of the history distribution
+ * @param historyStd the standard deviation of the history distribution
+ * @return The value of the φ
+ */
+ private double p(double elapsedTime, double historyMean, double
historyStd) {
+ final double y = (elapsedTime - historyMean) / historyStd;
+ /* Math.exp will return {@link Double.POSITIVE_INFINITY} SAFELY when
overflows. */
+ double e = Math.exp(-y * (1.5976 + 0.070566 * y * y));
+ if (elapsedTime > historyMean) {
+ return -Math.log10(e / (1.0 + e));
+ } else {
+ return -Math.log10(1.0 - 1.0 / (1.0 + e));
+ }
+ }
+ }
+}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/node/AINodeHeartbeatCache.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/node/AINodeHeartbeatCache.java
index 50ad19c530e..d60f3195ff9 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/node/AINodeHeartbeatCache.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/node/AINodeHeartbeatCache.java
@@ -21,7 +21,10 @@ package org.apache.iotdb.confignode.manager.load.cache.node;
import org.apache.iotdb.common.rpc.thrift.TLoadSample;
import org.apache.iotdb.commons.cluster.NodeStatus;
+import org.apache.iotdb.confignode.manager.load.cache.AbstractHeartbeatSample;
+import java.util.Collections;
+import java.util.List;
import java.util.concurrent.atomic.AtomicReference;
public class AINodeHeartbeatCache extends BaseNodeCache {
@@ -35,13 +38,12 @@ public class AINodeHeartbeatCache extends BaseNodeCache {
@Override
public void updateCurrentStatistics(boolean forceUpdate) {
- NodeHeartbeatSample lastSample = null;
+ NodeHeartbeatSample lastSample;
+ final List<AbstractHeartbeatSample> heartbeatHistory;
synchronized (slidingWindow) {
- if (!slidingWindow.isEmpty()) {
- lastSample = (NodeHeartbeatSample) getLastSample();
- }
+ lastSample = (NodeHeartbeatSample) getLastSample();
+ heartbeatHistory = Collections.unmodifiableList(slidingWindow);
}
- long lastSendTime = lastSample == null ? 0 :
lastSample.getSampleLogicalTimestamp();
/* Update load sample */
if (lastSample != null && lastSample.isSetLoadSample()) {
@@ -54,7 +56,8 @@ public class AINodeHeartbeatCache extends BaseNodeCache {
long currentNanoTime = System.nanoTime();
if (lastSample != null &&
NodeStatus.Removing.equals(lastSample.getStatus())) {
status = NodeStatus.Removing;
- } else if (currentNanoTime - lastSendTime > HEARTBEAT_TIMEOUT_TIME_IN_NS) {
+ } else if (!failureDetector.isAvailable(heartbeatHistory)) {
+ /* Failure detector decides that this AINode is UNKNOWN */
status = NodeStatus.Unknown;
} else if (lastSample != null) {
status = lastSample.getStatus();
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/node/ConfigNodeHeartbeatCache.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/node/ConfigNodeHeartbeatCache.java
index d6a41285c54..1f6eda65f50 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/node/ConfigNodeHeartbeatCache.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/node/ConfigNodeHeartbeatCache.java
@@ -21,6 +21,10 @@ package org.apache.iotdb.confignode.manager.load.cache.node;
import org.apache.iotdb.commons.cluster.NodeStatus;
import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
+import org.apache.iotdb.confignode.manager.load.cache.AbstractHeartbeatSample;
+
+import java.util.Collections;
+import java.util.List;
/** Heartbeat cache for cluster ConfigNodes. */
public class ConfigNodeHeartbeatCache extends BaseNodeCache {
@@ -51,18 +55,20 @@ public class ConfigNodeHeartbeatCache extends BaseNodeCache
{
}
NodeHeartbeatSample lastSample;
+ final List<AbstractHeartbeatSample> heartbeatHistory;
synchronized (slidingWindow) {
lastSample = (NodeHeartbeatSample) getLastSample();
+ heartbeatHistory = Collections.unmodifiableList(slidingWindow);
}
- long lastSendTime = lastSample == null ? 0 :
lastSample.getSampleLogicalTimestamp();
// Update Node status
NodeStatus status;
long currentNanoTime = System.nanoTime();
if (lastSample == null) {
+ /* First heartbeat not received from this ConfigNode, status is UNKNOWN
*/
status = NodeStatus.Unknown;
- } else if (currentNanoTime - lastSendTime > HEARTBEAT_TIMEOUT_TIME_IN_NS) {
- // TODO: Optimize Unknown judge logic
+ } else if (!failureDetector.isAvailable(heartbeatHistory)) {
+ /* Failure detector decides that this ConfigNode is UNKNOWN */
status = NodeStatus.Unknown;
} else {
status = lastSample.getStatus();
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/node/DataNodeHeartbeatCache.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/node/DataNodeHeartbeatCache.java
index 6654d3f1f15..a5293d8b397 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/node/DataNodeHeartbeatCache.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/node/DataNodeHeartbeatCache.java
@@ -21,10 +21,13 @@ package org.apache.iotdb.confignode.manager.load.cache.node;
import org.apache.iotdb.common.rpc.thrift.TLoadSample;
import org.apache.iotdb.commons.cluster.NodeStatus;
+import org.apache.iotdb.confignode.manager.load.cache.AbstractHeartbeatSample;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.Collections;
+import java.util.List;
import java.util.concurrent.atomic.AtomicReference;
/** Heartbeat cache for cluster DataNodes. */
@@ -49,10 +52,11 @@ public class DataNodeHeartbeatCache extends BaseNodeCache {
}
NodeHeartbeatSample lastSample;
+ final List<AbstractHeartbeatSample> heartbeatHistory;
synchronized (slidingWindow) {
lastSample = (NodeHeartbeatSample) getLastSample();
+ heartbeatHistory = Collections.unmodifiableList(slidingWindow);
}
- long lastSendTime = lastSample == null ? 0 :
lastSample.getSampleLogicalTimestamp();
/* Update load sample */
if (lastSample != null && lastSample.isSetLoadSample()) {
@@ -64,9 +68,10 @@ public class DataNodeHeartbeatCache extends BaseNodeCache {
String statusReason = null;
long currentNanoTime = System.nanoTime();
if (lastSample == null) {
+ /* First heartbeat not received from this DataNode, status is UNKNOWN */
status = NodeStatus.Unknown;
- } else if (currentNanoTime - lastSendTime > HEARTBEAT_TIMEOUT_TIME_IN_NS) {
- // TODO: Optimize Unknown judge logic
+ } else if (!failureDetector.isAvailable(heartbeatHistory)) {
+ /* Failure detector decides that this DataNode is UNKNOWN */
status = NodeStatus.Unknown;
} else {
status = lastSample.getStatus();
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/region/RegionCache.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/region/RegionCache.java
index ecb4aebd10b..b2eb95845f9 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/region/RegionCache.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/region/RegionCache.java
@@ -20,8 +20,12 @@
package org.apache.iotdb.confignode.manager.load.cache.region;
import org.apache.iotdb.commons.cluster.RegionStatus;
+import org.apache.iotdb.confignode.manager.load.cache.AbstractHeartbeatSample;
import org.apache.iotdb.confignode.manager.load.cache.AbstractLoadCache;
+import java.util.Collections;
+import java.util.List;
+
/**
* RegionCache caches the RegionHeartbeatSamples of a Region. Update and cache
the current
* statistics of the Region based on the latest RegionHeartbeatSample.
@@ -36,17 +40,19 @@ public class RegionCache extends AbstractLoadCache {
@Override
public synchronized void updateCurrentStatistics(boolean forceUpdate) {
RegionHeartbeatSample lastSample;
+ List<AbstractHeartbeatSample> history;
synchronized (slidingWindow) {
lastSample = (RegionHeartbeatSample) getLastSample();
+ history = Collections.unmodifiableList(slidingWindow);
}
RegionStatus status;
long currentNanoTime = System.nanoTime();
if (lastSample == null) {
+ /* First heartbeat not received from this region, status is UNKNOWN */
status = RegionStatus.Unknown;
- } else if (currentNanoTime - lastSample.getSampleLogicalTimestamp()
- > HEARTBEAT_TIMEOUT_TIME_IN_NS) {
- // TODO: Optimize Unknown judge logic
+ } else if (!failureDetector.isAvailable(history)) {
+ /* Failure detector decides that this region is UNKNOWN */
status = RegionStatus.Unknown;
} else {
status = lastSample.getStatus();
diff --git
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/cache/detector/DetectorTest.java
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/cache/detector/DetectorTest.java
new file mode 100644
index 00000000000..a56a7166646
--- /dev/null
+++
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/cache/detector/DetectorTest.java
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.manager.load.cache.detector;
+
+import org.apache.iotdb.commons.cluster.NodeStatus;
+import org.apache.iotdb.confignode.manager.load.cache.AbstractHeartbeatSample;
+import org.apache.iotdb.confignode.manager.load.cache.node.NodeHeartbeatSample;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
+
+public class DetectorTest {
+
+ final long sec = 1_000_000_000L;
+ final FixedDetector fixedDetector = new FixedDetector(20 * sec);
+ final PhiAccrualDetector phiAccrualDetector =
+ new PhiAccrualDetector(30, 10 * sec, (long) (0.2 * sec), 0,
fixedDetector);
+
+ private double getPhi(long elapsed, double[] intervals, long minStd, long
pause) {
+ final PhiAccrualDetector.PhiAccrual p =
+ new PhiAccrualDetector.PhiAccrual(intervals, elapsed, minStd, pause);
+ return p.phi();
+ }
+
+ private void assertInRange(double value, double start, double end) {
+ Assert.assertTrue(value > start);
+ Assert.assertTrue(value < end);
+ }
+
+ @Test
+ public void testFixedDetector() {
+ final long lastHeartbeatTs = System.nanoTime() - 21 * sec;
+ final List<AbstractHeartbeatSample> history =
+ Collections.singletonList(new NodeHeartbeatSample(lastHeartbeatTs,
NodeStatus.Running));
+ Assert.assertFalse(fixedDetector.isAvailable(history));
+
+ final long lastAvailableHeartbeat = System.nanoTime() - 18 * sec;
+ final List<AbstractHeartbeatSample> history2 =
+ Collections.singletonList(
+ new NodeHeartbeatSample(lastAvailableHeartbeat,
NodeStatus.Running));
+ Assert.assertTrue(fixedDetector.isAvailable(history2));
+ }
+
+ @Test
+ public void testPhiCalculation1() {
+ /* (min, std, acceptable_pause) = (1000, 200, 0) */
+ final double[] heartbeatIntervals = {1000, 1000, 1000, 1000, 1000};
+ final long minStd = 200;
+ final long pause = 0;
+
+ assertInRange(getPhi(1000, heartbeatIntervals, minStd, pause), 0, 1);
+ assertInRange(getPhi(2000, heartbeatIntervals, minStd, pause), 5, 10);
+ assertInRange(getPhi(3000, heartbeatIntervals, minStd, pause), 35, 50);
+ }
+
+ @Test
+ public void testPhiCalculation2() {
+ /* (min, std, acceptable_pause) = (1000, 300, 0) */
+ final double[] heartbeatIntervals = {1000, 1000, 1000, 1000, 1000};
+ final long minStd = 300;
+ final long pause = 0;
+
+ assertInRange(getPhi(1000, heartbeatIntervals, minStd, pause), 0, 1);
+ assertInRange(getPhi(2000, heartbeatIntervals, minStd, pause), 1, 5);
+ assertInRange(getPhi(3000, heartbeatIntervals, minStd, pause), 10, 15);
+ }
+
+ @Test
+ public void testPhiCalculation3() {
+ /* (min, std, acceptable_pause) = (1000, 200, 3000) */
+ final double[] heartbeatIntervals = {1000, 1000, 1000, 1000, 1000};
+ final long minStd = 200;
+ final long pause = 3000;
+
+ assertInRange(getPhi(1000 + pause, heartbeatIntervals, minStd, pause), 0,
1);
+ assertInRange(getPhi(2000 + pause, heartbeatIntervals, minStd, pause), 5,
10);
+ assertInRange(getPhi(3000 + pause, heartbeatIntervals, minStd, pause), 35,
50);
+ }
+
+ /**
+ * When a node hasn't responded with interval longer than accepted GC pause
Phi Accrual can detect
+ * the problem quicker than Fix In this case, the accepted pause is 10s, but
we haven't received
+ * heartbeats for 13s
+ */
+ @Test
+ public void testComparisonQuickFailureDetection() {
+ long[] interval = new long[] {sec, sec, sec};
+ List<AbstractHeartbeatSample> history = fromInterval(interval, 13 * sec);
+ Assert.assertTrue(fixedDetector.isAvailable(history));
+ Assert.assertFalse(phiAccrualDetector.isAvailable(history));
+ }
+
+ /**
+ * When the system load is high, we may observe exceptionally long GC pause
The first
+ * exceptionally long GC pause will be a false positive to Phi the GC pause
is 15 (longer than the
+ * expected 10s) Phi will report false positive
+ */
+ @Test
+ public void testFalsePositiveOnExceptionallyLongGCPause() {
+ long[] interval = new long[] {sec, sec, sec};
+ long gcPause = 15 * sec;
+ List<AbstractHeartbeatSample> history = fromInterval(interval, gcPause + 2
* sec);
+ Assert.assertTrue(fixedDetector.isAvailable(history));
+ Assert.assertFalse(phiAccrualDetector.isAvailable(history));
+ }
+
+ /**
+ * When the system load is high, we may observe exceptionally long GC pause
If the GC pause is
+ * very often, Phi can be adaptive to the new environment in this case,
there are 2 long GC pause
+ * in history when a new GC with 21s pause occurs, Phi takes it normal while
Fixed will fail.
+ */
+ @Test
+ public void testPhiAdaptionToFrequentGCPause() {
+ long[] interval =
+ new long[] {
+ sec,
+ sec,
+ sec,
+ 15 * sec,
+ (long) (0.1 * sec),
+ sec,
+ sec,
+ sec,
+ 15 * sec,
+ (long) (0.1 * sec),
+ sec,
+ sec
+ };
+ List<AbstractHeartbeatSample> history = fromInterval(interval, 21 * sec);
+ Assert.assertFalse(fixedDetector.isAvailable(history));
+ Assert.assertTrue(phiAccrualDetector.isAvailable(history));
+ }
+
+ /**
+ * If the Phi detector haven't received enough samples its behavior will
fall back to Fix detector
+ */
+ @Test
+ public void testColdStart() {
+ final PhiAccrualDetector coldStartPhi =
+ new PhiAccrualDetector(30, 10 * sec, (long) (0.2 * sec), 60,
fixedDetector);
+ long[] interval = new long[] {sec, sec, sec};
+ List<AbstractHeartbeatSample> history = fromInterval(interval, 21 * sec);
+ Assert.assertFalse(fixedDetector.isAvailable(history));
+ Assert.assertFalse(coldStartPhi.isAvailable(history));
+ }
+
+ private List<AbstractHeartbeatSample> fromInterval(long[] interval, long
timeElapsed) {
+ long now = System.nanoTime();
+ long begin = now - timeElapsed;
+ List<AbstractHeartbeatSample> sample = new LinkedList<>();
+ sample.add(new NodeHeartbeatSample(begin, NodeStatus.Running));
+ for (int i = interval.length - 1; i >= 0; i--) {
+ begin -= interval[i];
+ sample.add(0, new NodeHeartbeatSample(begin, NodeStatus.Running));
+ }
+ return sample;
+ }
+}
diff --git
a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template
b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template
index cbd09fb784d..a853a2dc8a4 100644
---
a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template
+++
b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template
@@ -670,6 +670,26 @@ time_partition_interval=604800000
# Datatype: long
heartbeat_interval_in_ms=1000
+# Default failure detector, enum from {fixed, phi_accrual}
+# effectiveMode: restart
+# Datatype: string
+failure_detector=phi_accrual
+
+# Fixed failure detector threshold of time elapsed without receiving heartbeat
replies
+# effectiveMode: restart
+# Datatype: long
+failure_detector_fixed_threshold_in_ms=20000
+
+# Phi accrual: threshold to determine a node is down
+# effectiveMode: restart
+# Datatype: long
+failure_detector_phi_threshold=30
+
+# Phi accrual: acceptable heartbeat pause time (due to JVM GC, etc)
+# effectiveMode: restart
+# Datatype: long
+failure_detector_phi_acceptable_pause_in_ms=10000
+
# Disk remaining threshold at which DataNode is set to ReadOnly status
# effectiveMode: restart
# Datatype: double(percentage)