This is an automated email from the ASF dual-hosted git repository.

tanxinyu pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/dev/1.3 by this push:
     new 9d103de9d15 [RTO/RPO] Add Phi Accrual for Node failure detection 
(#14866) (#14987)
9d103de9d15 is described below

commit 9d103de9d15dfbb17d374876bf65bfe49b752143
Author: William Song <[email protected]>
AuthorDate: Fri Feb 28 18:41:54 2025 +0800

    [RTO/RPO] Add Phi Accrual for Node failure detection (#14866) (#14987)
    
    * add phi accrual for node failure detection
    
    * fix unit error
    
    * address review issues
    
    * address review issues
    
    * address review issues
    
    * address review issues
    
    * address review issues
---
 iotdb-core/confignode/pom.xml                      |   4 +
 .../iotdb/confignode/conf/ConfigNodeConfig.java    |  45 +++++
 .../confignode/conf/ConfigNodeDescriptor.java      |  30 ++++
 .../manager/load/cache/AbstractLoadCache.java      |  26 ++-
 .../manager/load/cache/IFailureDetector.java       |  39 +++++
 .../manager/load/cache/detector/FixedDetector.java |  58 +++++++
 .../load/cache/detector/PhiAccrualDetector.java    | 181 +++++++++++++++++++++
 .../load/cache/node/AINodeHeartbeatCache.java      |  15 +-
 .../load/cache/node/ConfigNodeHeartbeatCache.java  |  12 +-
 .../load/cache/node/DataNodeHeartbeatCache.java    |  11 +-
 .../manager/load/cache/region/RegionCache.java     |  12 +-
 .../manager/load/cache/detector/DetectorTest.java  | 179 ++++++++++++++++++++
 .../conf/iotdb-system.properties.template          |  20 +++
 13 files changed, 615 insertions(+), 17 deletions(-)

diff --git a/iotdb-core/confignode/pom.xml b/iotdb-core/confignode/pom.xml
index dd30ea1becd..083c87a8531 100644
--- a/iotdb-core/confignode/pom.xml
+++ b/iotdb-core/confignode/pom.xml
@@ -160,6 +160,10 @@
             <version>1.3.0</version>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>org.apache.commons</groupId>
+            <artifactId>commons-math3</artifactId>
+        </dependency>
     </dependencies>
     <build>
         <plugins>
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java
index ffb61da07e1..402f27ef5d8 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java
@@ -28,6 +28,7 @@ import org.apache.iotdb.commons.conf.IoTDBConstant;
 import org.apache.iotdb.confignode.manager.load.balancer.RegionBalancer;
 import 
org.apache.iotdb.confignode.manager.load.balancer.router.leader.AbstractLeaderBalancer;
 import 
org.apache.iotdb.confignode.manager.load.balancer.router.priority.IPriorityBalancer;
+import org.apache.iotdb.confignode.manager.load.cache.IFailureDetector;
 import 
org.apache.iotdb.confignode.manager.partition.RegionGroupExtensionPolicy;
 import org.apache.iotdb.consensus.ConsensusFactory;
 import org.apache.iotdb.metrics.config.MetricConfigDescriptor;
@@ -180,6 +181,18 @@ public class ConfigNodeConfig {
   /** The heartbeat interval in milliseconds. */
   private long heartbeatIntervalInMs = 1000;
 
+  /** Failure detector implementation */
+  private String failureDetector = IFailureDetector.PHI_ACCRUAL_DETECTOR;
+
+  /** Max heartbeat elapsed time threshold for Fixed failure detector */
+  private long failureDetectorFixedThresholdInMs = 20000;
+
+  /** Max threshold for Phi accrual failure detector */
+  private long failureDetectorPhiThreshold = 30;
+
+  /** Acceptable pause duration for Phi accrual failure detector */
+  private long failureDetectorPhiAcceptablePauseInMs = 10000;
+
   /** The unknown DataNode detect interval in milliseconds. */
   private long unknownDataNodeDetectInterval = heartbeatIntervalInMs;
 
@@ -1216,4 +1229,36 @@ public class ConfigNodeConfig {
         || (TConsensusGroupType.DataRegion.equals(regionGroupId.getType())
             && 
getDataRegionConsensusProtocolClass().equals(ConsensusFactory.RATIS_CONSENSUS));
   }
+
+  public String getFailureDetector() {
+    return failureDetector;
+  }
+
+  public void setFailureDetector(String failureDetector) {
+    this.failureDetector = failureDetector;
+  }
+
+  public long getFailureDetectorFixedThresholdInMs() {
+    return failureDetectorFixedThresholdInMs;
+  }
+
+  public void setFailureDetectorFixedThresholdInMs(long 
failureDetectorFixedThresholdInMs) {
+    this.failureDetectorFixedThresholdInMs = failureDetectorFixedThresholdInMs;
+  }
+
+  public long getFailureDetectorPhiThreshold() {
+    return failureDetectorPhiThreshold;
+  }
+
+  public void setFailureDetectorPhiThreshold(long failureDetectorPhiThreshold) 
{
+    this.failureDetectorPhiThreshold = failureDetectorPhiThreshold;
+  }
+
+  public long getFailureDetectorPhiAcceptablePauseInMs() {
+    return failureDetectorPhiAcceptablePauseInMs;
+  }
+
+  public void setFailureDetectorPhiAcceptablePauseInMs(long 
failureDetectorPhiAcceptablePauseInMs) {
+    this.failureDetectorPhiAcceptablePauseInMs = 
failureDetectorPhiAcceptablePauseInMs;
+  }
 }
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
index 8369998f966..b9efee34e28 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
@@ -30,6 +30,7 @@ import org.apache.iotdb.commons.utils.NodeUrlUtils;
 import org.apache.iotdb.confignode.manager.load.balancer.RegionBalancer;
 import 
org.apache.iotdb.confignode.manager.load.balancer.router.leader.AbstractLeaderBalancer;
 import 
org.apache.iotdb.confignode.manager.load.balancer.router.priority.IPriorityBalancer;
+import org.apache.iotdb.confignode.manager.load.cache.IFailureDetector;
 import 
org.apache.iotdb.confignode.manager.partition.RegionGroupExtensionPolicy;
 import org.apache.iotdb.metrics.config.MetricConfigDescriptor;
 import org.apache.iotdb.metrics.utils.NodeType;
@@ -319,6 +320,35 @@ public class ConfigNodeDescriptor {
                     "heartbeat_interval_in_ms", 
String.valueOf(conf.getHeartbeatIntervalInMs()))
                 .trim()));
 
+    String failureDetector = properties.getProperty("failure_detector", 
conf.getFailureDetector());
+    if (IFailureDetector.FIXED_DETECTOR.equals(failureDetector)
+        || IFailureDetector.PHI_ACCRUAL_DETECTOR.equals(failureDetector)) {
+      conf.setFailureDetector(failureDetector);
+    } else {
+      throw new IOException(
+          String.format(
+              "Unknown failure_detector: %s, " + "please set to \"fixed\" or 
\"phi_accrual\"",
+              failureDetector));
+    }
+
+    conf.setFailureDetectorFixedThresholdInMs(
+        Long.parseLong(
+            properties.getProperty(
+                "failure_detector_fixed_threshold_in_ms",
+                String.valueOf(conf.getFailureDetectorFixedThresholdInMs()))));
+
+    conf.setFailureDetectorPhiThreshold(
+        Long.parseLong(
+            properties.getProperty(
+                "failure_detector_phi_threshold",
+                String.valueOf(conf.getFailureDetectorPhiThreshold()))));
+
+    conf.setFailureDetectorPhiAcceptablePauseInMs(
+        Long.parseLong(
+            properties.getProperty(
+                "failure_detector_phi_acceptable_pause_in_ms",
+                
String.valueOf(conf.getFailureDetectorPhiAcceptablePauseInMs()))));
+
     String leaderDistributionPolicy =
         properties
             .getProperty("leader_distribution_policy", 
conf.getLeaderDistributionPolicy())
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/AbstractLoadCache.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/AbstractLoadCache.java
index 856b891c540..6355dfdcc82 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/AbstractLoadCache.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/AbstractLoadCache.java
@@ -19,6 +19,11 @@
 
 package org.apache.iotdb.confignode.manager.load.cache;
 
+import org.apache.iotdb.confignode.conf.ConfigNodeConfig;
+import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
+import org.apache.iotdb.confignode.manager.load.cache.detector.FixedDetector;
+import 
org.apache.iotdb.confignode.manager.load.cache.detector.PhiAccrualDetector;
+
 import java.util.Collections;
 import java.util.LinkedList;
 import java.util.List;
@@ -32,17 +37,34 @@ public abstract class AbstractLoadCache {
 
   // Max heartbeat cache samples store size
   private static final int MAXIMUM_WINDOW_SIZE = 100;
-  // The Status will be set to Unknown when the response time of heartbeat is 
more than 20s
-  protected static final long HEARTBEAT_TIMEOUT_TIME_IN_NS = 20_000_000_000L;
 
   // Caching the recent MAXIMUM_WINDOW_SIZE heartbeat sample
   protected final List<AbstractHeartbeatSample> slidingWindow;
   // The current statistics calculated by the latest heartbeat sample
   protected final AtomicReference<AbstractStatistics> currentStatistics;
 
+  protected final IFailureDetector failureDetector;
+
+  private static final ConfigNodeConfig CONF = 
ConfigNodeDescriptor.getInstance().getConf();
+
   protected AbstractLoadCache() {
     this.currentStatistics = new AtomicReference<>();
     this.slidingWindow = Collections.synchronizedList(new LinkedList<>());
+    switch (CONF.getFailureDetector()) {
+      case IFailureDetector.PHI_ACCRUAL_DETECTOR:
+        this.failureDetector =
+            new PhiAccrualDetector(
+                CONF.getFailureDetectorPhiThreshold(),
+                CONF.getFailureDetectorPhiAcceptablePauseInMs() * 1000_000L,
+                CONF.getHeartbeatIntervalInMs() * 200_000L,
+                60,
+                new FixedDetector(CONF.getFailureDetectorFixedThresholdInMs() 
* 1000_000L));
+        break;
+      case IFailureDetector.FIXED_DETECTOR:
+      default:
+        this.failureDetector =
+            new FixedDetector(CONF.getFailureDetectorFixedThresholdInMs() * 
1000_000L);
+    }
   }
 
   /**
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/IFailureDetector.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/IFailureDetector.java
new file mode 100644
index 00000000000..be6a8f621ba
--- /dev/null
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/IFailureDetector.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.manager.load.cache;
+
+import java.util.List;
+
+/**
+ * IFailureDetector is the judge for node status (UNKNOWN). {@link 
#isAvailable will be called each
+ * fixed interval updating the node status}
+ */
+public interface IFailureDetector {
+  String FIXED_DETECTOR = "fixed";
+  String PHI_ACCRUAL_DETECTOR = "phi_accrual";
+
+  /**
+   * Given the heartbeat history, decide whether this endpoint is still 
available
+   *
+   * @param history heartbeat history
+   * @return false if the endpoint is under failure
+   */
+  boolean isAvailable(List<AbstractHeartbeatSample> history);
+}
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/detector/FixedDetector.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/detector/FixedDetector.java
new file mode 100644
index 00000000000..35fb4e3f2a1
--- /dev/null
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/detector/FixedDetector.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.manager.load.cache.detector;
+
+import org.apache.iotdb.confignode.manager.load.cache.AbstractHeartbeatSample;
+import org.apache.iotdb.confignode.manager.load.cache.IFailureDetector;
+import org.apache.iotdb.confignode.manager.load.cache.node.NodeHeartbeatSample;
+import 
org.apache.iotdb.confignode.manager.load.cache.region.RegionHeartbeatSample;
+
+import org.apache.tsfile.utils.Preconditions;
+
+import java.util.List;
+import java.util.Optional;
+
+/**
+ * FixedDetector will decide a node unknown iff. Time elapsed from last 
heartbeat exceeds the
+ * heartbeatTimeoutNs.
+ */
+public class FixedDetector implements IFailureDetector {
+  private final long heartbeatTimeoutNs;
+
+  public FixedDetector(long heartbeatTimeoutNs) {
+    this.heartbeatTimeoutNs = heartbeatTimeoutNs;
+  }
+
+  @Override
+  public boolean isAvailable(List<AbstractHeartbeatSample> history) {
+    final AbstractHeartbeatSample lastSample =
+        history.isEmpty() ? null : history.get(history.size() - 1);
+    if (lastSample != null) {
+      Preconditions.checkArgument(
+          lastSample instanceof NodeHeartbeatSample || lastSample instanceof 
RegionHeartbeatSample);
+    }
+    final long lastSendTime =
+        Optional.ofNullable(lastSample)
+            .map(AbstractHeartbeatSample::getSampleLogicalTimestamp)
+            .orElse(0L);
+    final long currentNanoTime = System.nanoTime();
+    return currentNanoTime - lastSendTime <= heartbeatTimeoutNs;
+  }
+}
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/detector/PhiAccrualDetector.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/detector/PhiAccrualDetector.java
new file mode 100644
index 00000000000..a4947e8a3b0
--- /dev/null
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/detector/PhiAccrualDetector.java
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.manager.load.cache.detector;
+
+import org.apache.iotdb.confignode.manager.load.cache.AbstractHeartbeatSample;
+import org.apache.iotdb.confignode.manager.load.cache.IFailureDetector;
+import org.apache.iotdb.confignode.manager.load.cache.node.NodeHeartbeatSample;
+import 
org.apache.iotdb.confignode.manager.load.cache.region.RegionHeartbeatSample;
+
+import org.apache.commons.math3.stat.descriptive.DescriptiveStatistics;
+import org.apache.tsfile.utils.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * The Phi Failure Detector, proposed by Hayashibara, Naohiro, et al. "The/spl 
phi/accrual failure
+ * detector.". It is an accrual approach based on heartbeat history analysis 
with dynamic
+ * sensitivity and tunable threshold. It is adaptive with early failure 
detection, increased
+ * accuracy and improved system stability.
+ *
+ * <p>Initially, Phi has a cold start period where it will only collect 
heartbeat samples and
+ * fallback decision-making to {@link FixedDetector}. After collecting enough 
samples, it will start
+ * failure detection using the Phi algo.
+ */
+public class PhiAccrualDetector implements IFailureDetector {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(PhiAccrualDetector.class);
+  private final long threshold;
+  private final long acceptableHeartbeatPauseNs;
+  private final long minHeartbeatStdNs;
+  private final int codeStartSampleCount;
+  private final IFailureDetector fallbackDuringColdStart;
+
+  public PhiAccrualDetector(
+      long threshold,
+      long acceptableHeartbeatPauseNs,
+      long minHeartbeatStdNs,
+      int minimalSampleCount,
+      IFailureDetector fallbackDuringColdStart) {
+    this.threshold = threshold;
+    this.acceptableHeartbeatPauseNs = acceptableHeartbeatPauseNs;
+    this.minHeartbeatStdNs = minHeartbeatStdNs;
+    this.codeStartSampleCount = minimalSampleCount;
+    this.fallbackDuringColdStart = fallbackDuringColdStart;
+  }
+
+  @Override
+  public boolean isAvailable(List<AbstractHeartbeatSample> history) {
+    if (history.size() < codeStartSampleCount) {
+      /* We haven't received enough heartbeat replies.*/
+      return fallbackDuringColdStart.isAvailable(history);
+    }
+    final PhiAccrual phiAccrual = create(history);
+    final boolean isAvailable = phiAccrual.phi() < (double) this.threshold;
+    if (!isAvailable) {
+      // log the status change and dump the heartbeat history for analysis use
+      final StringBuilder builder = new StringBuilder();
+      builder.append("[");
+      for (double interval : phiAccrual.heartbeatIntervals) {
+        final long msInterval = (long) interval / 1000_000;
+        builder.append(msInterval).append(", ");
+      }
+      builder.append(phiAccrual.timeElapsedSinceLastHeartbeat / 1000_000);
+      builder.append("]");
+      LOGGER.info(String.format("Node Down, heartbeat history (ms): %s", 
builder));
+    }
+
+    return isAvailable;
+  }
+
+  PhiAccrual create(List<AbstractHeartbeatSample> history) {
+    final List<Double> heartbeatIntervals = new ArrayList<>();
+
+    long lastTs = -1;
+    for (final AbstractHeartbeatSample sample : history) {
+      // ensure getSampleLogicalTimestamp() will return system nano timestamp
+      Preconditions.checkArgument(
+          sample instanceof NodeHeartbeatSample || sample instanceof 
RegionHeartbeatSample);
+      if (lastTs == -1) {
+        lastTs = sample.getSampleLogicalTimestamp();
+        continue;
+      }
+      heartbeatIntervals.add((double) sample.getSampleLogicalTimestamp() - 
lastTs);
+      lastTs = sample.getSampleLogicalTimestamp();
+    }
+    final long lastHeartbeatTimestamp = history.get(history.size() - 
1).getSampleLogicalTimestamp();
+    final long timeElapsedSinceLastHeartbeat = System.nanoTime() - 
lastHeartbeatTimestamp;
+
+    final double[] intervalArray =
+        heartbeatIntervals.stream().mapToDouble(Double::doubleValue).toArray();
+    return new PhiAccrual(
+        intervalArray,
+        timeElapsedSinceLastHeartbeat,
+        minHeartbeatStdNs,
+        acceptableHeartbeatPauseNs);
+  }
+
+  /**
+   * The φ Accrual Failure Detector implementation. See <a
+   * 
href="https://doc.akka.io/libraries/akka-core/current/typed/failure-detector.html";>φ
+   * Accrual</a>
+   */
+  static final class PhiAccrual {
+    /*
+     * All the heartbeat related intervals within this class should be 
calculated in unit of nanoseconds
+     */
+    private final double[] heartbeatIntervals;
+    private final long timeElapsedSinceLastHeartbeat;
+    private final long minHeartbeatStd;
+    private final long acceptableHeartbeatPause;
+
+    PhiAccrual(
+        double[] heartbeatIntervals,
+        long timeElapsedSinceLastHeartbeat,
+        long minHeartbeatStd,
+        long acceptableHeartbeatPause) {
+      Preconditions.checkArgument(heartbeatIntervals.length > 0);
+      Preconditions.checkArgument(timeElapsedSinceLastHeartbeat >= 0);
+      this.heartbeatIntervals = heartbeatIntervals;
+      this.timeElapsedSinceLastHeartbeat = timeElapsedSinceLastHeartbeat;
+      this.minHeartbeatStd = minHeartbeatStd;
+      this.acceptableHeartbeatPause = acceptableHeartbeatPause;
+    }
+
+    /**
+     * @return phi value given the heartbeat interval history
+     */
+    double phi() {
+      final DescriptiveStatistics ds = new 
DescriptiveStatistics(heartbeatIntervals);
+      double mean = ds.getMean();
+      double std = ds.getStandardDeviation();
+
+      /* ensure the std is valid */
+      std = Math.max(std, minHeartbeatStd);
+
+      /* add tolerance specified by acceptableHeartbeatPause */
+      mean += acceptableHeartbeatPause;
+
+      return p(timeElapsedSinceLastHeartbeat, mean, std);
+    }
+
+    /**
+     * Core method for calculating the phi φ coefficient. It uses a logistic 
approximation to the
+     * cumulative normal distribution.
+     *
+     * @param elapsedTime the difference of the times (current - last 
heartbeat timestamp)
+     * @param historyMean the mean of the history distribution
+     * @param historyStd the standard deviation of the history distribution
+     * @return The value of the φ
+     */
+    private double p(double elapsedTime, double historyMean, double 
historyStd) {
+      final double y = (elapsedTime - historyMean) / historyStd;
+      /* Math.exp will return {@link Double.POSITIVE_INFINITY} SAFELY when 
overflows. */
+      double e = Math.exp(-y * (1.5976 + 0.070566 * y * y));
+      if (elapsedTime > historyMean) {
+        return -Math.log10(e / (1.0 + e));
+      } else {
+        return -Math.log10(1.0 - 1.0 / (1.0 + e));
+      }
+    }
+  }
+}
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/node/AINodeHeartbeatCache.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/node/AINodeHeartbeatCache.java
index 50ad19c530e..d60f3195ff9 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/node/AINodeHeartbeatCache.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/node/AINodeHeartbeatCache.java
@@ -21,7 +21,10 @@ package org.apache.iotdb.confignode.manager.load.cache.node;
 
 import org.apache.iotdb.common.rpc.thrift.TLoadSample;
 import org.apache.iotdb.commons.cluster.NodeStatus;
+import org.apache.iotdb.confignode.manager.load.cache.AbstractHeartbeatSample;
 
+import java.util.Collections;
+import java.util.List;
 import java.util.concurrent.atomic.AtomicReference;
 
 public class AINodeHeartbeatCache extends BaseNodeCache {
@@ -35,13 +38,12 @@ public class AINodeHeartbeatCache extends BaseNodeCache {
 
   @Override
   public void updateCurrentStatistics(boolean forceUpdate) {
-    NodeHeartbeatSample lastSample = null;
+    NodeHeartbeatSample lastSample;
+    final List<AbstractHeartbeatSample> heartbeatHistory;
     synchronized (slidingWindow) {
-      if (!slidingWindow.isEmpty()) {
-        lastSample = (NodeHeartbeatSample) getLastSample();
-      }
+      lastSample = (NodeHeartbeatSample) getLastSample();
+      heartbeatHistory = Collections.unmodifiableList(slidingWindow);
     }
-    long lastSendTime = lastSample == null ? 0 : 
lastSample.getSampleLogicalTimestamp();
 
     /* Update load sample */
     if (lastSample != null && lastSample.isSetLoadSample()) {
@@ -54,7 +56,8 @@ public class AINodeHeartbeatCache extends BaseNodeCache {
     long currentNanoTime = System.nanoTime();
     if (lastSample != null && 
NodeStatus.Removing.equals(lastSample.getStatus())) {
       status = NodeStatus.Removing;
-    } else if (currentNanoTime - lastSendTime > HEARTBEAT_TIMEOUT_TIME_IN_NS) {
+    } else if (!failureDetector.isAvailable(heartbeatHistory)) {
+      /* Failure detector decides that this AINode is UNKNOWN */
       status = NodeStatus.Unknown;
     } else if (lastSample != null) {
       status = lastSample.getStatus();
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/node/ConfigNodeHeartbeatCache.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/node/ConfigNodeHeartbeatCache.java
index d6a41285c54..1f6eda65f50 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/node/ConfigNodeHeartbeatCache.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/node/ConfigNodeHeartbeatCache.java
@@ -21,6 +21,10 @@ package org.apache.iotdb.confignode.manager.load.cache.node;
 
 import org.apache.iotdb.commons.cluster.NodeStatus;
 import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
+import org.apache.iotdb.confignode.manager.load.cache.AbstractHeartbeatSample;
+
+import java.util.Collections;
+import java.util.List;
 
 /** Heartbeat cache for cluster ConfigNodes. */
 public class ConfigNodeHeartbeatCache extends BaseNodeCache {
@@ -51,18 +55,20 @@ public class ConfigNodeHeartbeatCache extends BaseNodeCache 
{
     }
 
     NodeHeartbeatSample lastSample;
+    final List<AbstractHeartbeatSample> heartbeatHistory;
     synchronized (slidingWindow) {
       lastSample = (NodeHeartbeatSample) getLastSample();
+      heartbeatHistory = Collections.unmodifiableList(slidingWindow);
     }
-    long lastSendTime = lastSample == null ? 0 : 
lastSample.getSampleLogicalTimestamp();
 
     // Update Node status
     NodeStatus status;
     long currentNanoTime = System.nanoTime();
     if (lastSample == null) {
+      /* First heartbeat not received from this ConfigNode, status is UNKNOWN 
*/
       status = NodeStatus.Unknown;
-    } else if (currentNanoTime - lastSendTime > HEARTBEAT_TIMEOUT_TIME_IN_NS) {
-      // TODO: Optimize Unknown judge logic
+    } else if (!failureDetector.isAvailable(heartbeatHistory)) {
+      /* Failure detector decides that this ConfigNode is UNKNOWN */
       status = NodeStatus.Unknown;
     } else {
       status = lastSample.getStatus();
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/node/DataNodeHeartbeatCache.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/node/DataNodeHeartbeatCache.java
index 6654d3f1f15..a5293d8b397 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/node/DataNodeHeartbeatCache.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/node/DataNodeHeartbeatCache.java
@@ -21,10 +21,13 @@ package org.apache.iotdb.confignode.manager.load.cache.node;
 
 import org.apache.iotdb.common.rpc.thrift.TLoadSample;
 import org.apache.iotdb.commons.cluster.NodeStatus;
+import org.apache.iotdb.confignode.manager.load.cache.AbstractHeartbeatSample;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.Collections;
+import java.util.List;
 import java.util.concurrent.atomic.AtomicReference;
 
 /** Heartbeat cache for cluster DataNodes. */
@@ -49,10 +52,11 @@ public class DataNodeHeartbeatCache extends BaseNodeCache {
     }
 
     NodeHeartbeatSample lastSample;
+    final List<AbstractHeartbeatSample> heartbeatHistory;
     synchronized (slidingWindow) {
       lastSample = (NodeHeartbeatSample) getLastSample();
+      heartbeatHistory = Collections.unmodifiableList(slidingWindow);
     }
-    long lastSendTime = lastSample == null ? 0 : 
lastSample.getSampleLogicalTimestamp();
 
     /* Update load sample */
     if (lastSample != null && lastSample.isSetLoadSample()) {
@@ -64,9 +68,10 @@ public class DataNodeHeartbeatCache extends BaseNodeCache {
     String statusReason = null;
     long currentNanoTime = System.nanoTime();
     if (lastSample == null) {
+      /* First heartbeat not received from this DataNode, status is UNKNOWN */
       status = NodeStatus.Unknown;
-    } else if (currentNanoTime - lastSendTime > HEARTBEAT_TIMEOUT_TIME_IN_NS) {
-      // TODO: Optimize Unknown judge logic
+    } else if (!failureDetector.isAvailable(heartbeatHistory)) {
+      /* Failure detector decides that this DataNode is UNKNOWN */
       status = NodeStatus.Unknown;
     } else {
       status = lastSample.getStatus();
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/region/RegionCache.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/region/RegionCache.java
index ecb4aebd10b..b2eb95845f9 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/region/RegionCache.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/region/RegionCache.java
@@ -20,8 +20,12 @@
 package org.apache.iotdb.confignode.manager.load.cache.region;
 
 import org.apache.iotdb.commons.cluster.RegionStatus;
+import org.apache.iotdb.confignode.manager.load.cache.AbstractHeartbeatSample;
 import org.apache.iotdb.confignode.manager.load.cache.AbstractLoadCache;
 
+import java.util.Collections;
+import java.util.List;
+
 /**
  * RegionCache caches the RegionHeartbeatSamples of a Region. Update and cache 
the current
  * statistics of the Region based on the latest RegionHeartbeatSample.
@@ -36,17 +40,19 @@ public class RegionCache extends AbstractLoadCache {
   @Override
   public synchronized void updateCurrentStatistics(boolean forceUpdate) {
     RegionHeartbeatSample lastSample;
+    List<AbstractHeartbeatSample> history;
     synchronized (slidingWindow) {
       lastSample = (RegionHeartbeatSample) getLastSample();
+      history = Collections.unmodifiableList(slidingWindow);
     }
 
     RegionStatus status;
     long currentNanoTime = System.nanoTime();
     if (lastSample == null) {
+      /* First heartbeat not received from this region, status is UNKNOWN */
       status = RegionStatus.Unknown;
-    } else if (currentNanoTime - lastSample.getSampleLogicalTimestamp()
-        > HEARTBEAT_TIMEOUT_TIME_IN_NS) {
-      // TODO: Optimize Unknown judge logic
+    } else if (!failureDetector.isAvailable(history)) {
+      /* Failure detector decides that this region is UNKNOWN */
       status = RegionStatus.Unknown;
     } else {
       status = lastSample.getStatus();
diff --git 
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/cache/detector/DetectorTest.java
 
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/cache/detector/DetectorTest.java
new file mode 100644
index 00000000000..a56a7166646
--- /dev/null
+++ 
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/cache/detector/DetectorTest.java
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.manager.load.cache.detector;
+
+import org.apache.iotdb.commons.cluster.NodeStatus;
+import org.apache.iotdb.confignode.manager.load.cache.AbstractHeartbeatSample;
+import org.apache.iotdb.confignode.manager.load.cache.node.NodeHeartbeatSample;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
+
+public class DetectorTest {
+
+  final long sec = 1_000_000_000L;
+  final FixedDetector fixedDetector = new FixedDetector(20 * sec);
+  final PhiAccrualDetector phiAccrualDetector =
+      new PhiAccrualDetector(30, 10 * sec, (long) (0.2 * sec), 0, 
fixedDetector);
+
+  private double getPhi(long elapsed, double[] intervals, long minStd, long 
pause) {
+    final PhiAccrualDetector.PhiAccrual p =
+        new PhiAccrualDetector.PhiAccrual(intervals, elapsed, minStd, pause);
+    return p.phi();
+  }
+
+  private void assertInRange(double value, double start, double end) {
+    Assert.assertTrue(value > start);
+    Assert.assertTrue(value < end);
+  }
+
+  @Test
+  public void testFixedDetector() {
+    final long lastHeartbeatTs = System.nanoTime() - 21 * sec;
+    final List<AbstractHeartbeatSample> history =
+        Collections.singletonList(new NodeHeartbeatSample(lastHeartbeatTs, 
NodeStatus.Running));
+    Assert.assertFalse(fixedDetector.isAvailable(history));
+
+    final long lastAvailableHeartbeat = System.nanoTime() - 18 * sec;
+    final List<AbstractHeartbeatSample> history2 =
+        Collections.singletonList(
+            new NodeHeartbeatSample(lastAvailableHeartbeat, 
NodeStatus.Running));
+    Assert.assertTrue(fixedDetector.isAvailable(history2));
+  }
+
+  @Test
+  public void testPhiCalculation1() {
+    /* (min, std, acceptable_pause) = (1000, 200, 0) */
+    final double[] heartbeatIntervals = {1000, 1000, 1000, 1000, 1000};
+    final long minStd = 200;
+    final long pause = 0;
+
+    assertInRange(getPhi(1000, heartbeatIntervals, minStd, pause), 0, 1);
+    assertInRange(getPhi(2000, heartbeatIntervals, minStd, pause), 5, 10);
+    assertInRange(getPhi(3000, heartbeatIntervals, minStd, pause), 35, 50);
+  }
+
+  @Test
+  public void testPhiCalculation2() {
+    /* (min, std, acceptable_pause) = (1000, 300, 0) */
+    final double[] heartbeatIntervals = {1000, 1000, 1000, 1000, 1000};
+    final long minStd = 300;
+    final long pause = 0;
+
+    assertInRange(getPhi(1000, heartbeatIntervals, minStd, pause), 0, 1);
+    assertInRange(getPhi(2000, heartbeatIntervals, minStd, pause), 1, 5);
+    assertInRange(getPhi(3000, heartbeatIntervals, minStd, pause), 10, 15);
+  }
+
+  @Test
+  public void testPhiCalculation3() {
+    /* (min, std, acceptable_pause) = (1000, 200, 3000) */
+    final double[] heartbeatIntervals = {1000, 1000, 1000, 1000, 1000};
+    final long minStd = 200;
+    final long pause = 3000;
+
+    assertInRange(getPhi(1000 + pause, heartbeatIntervals, minStd, pause), 0, 
1);
+    assertInRange(getPhi(2000 + pause, heartbeatIntervals, minStd, pause), 5, 
10);
+    assertInRange(getPhi(3000 + pause, heartbeatIntervals, minStd, pause), 35, 
50);
+  }
+
+  /**
+   * When a node hasn't responded with interval longer than accepted GC pause 
Phi Accrual can detect
+   * the problem quicker than Fix In this case, the accepted pause is 10s, but 
we haven't received
+   * heartbeats for 13s
+   */
+  @Test
+  public void testComparisonQuickFailureDetection() {
+    long[] interval = new long[] {sec, sec, sec};
+    List<AbstractHeartbeatSample> history = fromInterval(interval, 13 * sec);
+    Assert.assertTrue(fixedDetector.isAvailable(history));
+    Assert.assertFalse(phiAccrualDetector.isAvailable(history));
+  }
+
+  /**
+   * When the system load is high, we may observe exceptionally long GC pause 
The first
+   * exceptionally long GC pause will be a false positive to Phi the GC pause 
is 15 (longer than the
+   * expected 10s) Phi will report false positive
+   */
+  @Test
+  public void testFalsePositiveOnExceptionallyLongGCPause() {
+    long[] interval = new long[] {sec, sec, sec};
+    long gcPause = 15 * sec;
+    List<AbstractHeartbeatSample> history = fromInterval(interval, gcPause + 2 
* sec);
+    Assert.assertTrue(fixedDetector.isAvailable(history));
+    Assert.assertFalse(phiAccrualDetector.isAvailable(history));
+  }
+
+  /**
+   * When the system load is high, we may observe exceptionally long GC pause 
If the GC pause is
+   * very often, Phi can be adaptive to the new environment in this case, 
there are 2 long GC pause
+   * in history when a new GC with 21s pause occurs, Phi takes it normal while 
Fixed will fail.
+   */
+  @Test
+  public void testPhiAdaptionToFrequentGCPause() {
+    long[] interval =
+        new long[] {
+          sec,
+          sec,
+          sec,
+          15 * sec,
+          (long) (0.1 * sec),
+          sec,
+          sec,
+          sec,
+          15 * sec,
+          (long) (0.1 * sec),
+          sec,
+          sec
+        };
+    List<AbstractHeartbeatSample> history = fromInterval(interval, 21 * sec);
+    Assert.assertFalse(fixedDetector.isAvailable(history));
+    Assert.assertTrue(phiAccrualDetector.isAvailable(history));
+  }
+
+  /**
+   * If the Phi detector haven't received enough samples its behavior will 
fall back to Fix detector
+   */
+  @Test
+  public void testColdStart() {
+    final PhiAccrualDetector coldStartPhi =
+        new PhiAccrualDetector(30, 10 * sec, (long) (0.2 * sec), 60, 
fixedDetector);
+    long[] interval = new long[] {sec, sec, sec};
+    List<AbstractHeartbeatSample> history = fromInterval(interval, 21 * sec);
+    Assert.assertFalse(fixedDetector.isAvailable(history));
+    Assert.assertFalse(coldStartPhi.isAvailable(history));
+  }
+
+  private List<AbstractHeartbeatSample> fromInterval(long[] interval, long 
timeElapsed) {
+    long now = System.nanoTime();
+    long begin = now - timeElapsed;
+    List<AbstractHeartbeatSample> sample = new LinkedList<>();
+    sample.add(new NodeHeartbeatSample(begin, NodeStatus.Running));
+    for (int i = interval.length - 1; i >= 0; i--) {
+      begin -= interval[i];
+      sample.add(0, new NodeHeartbeatSample(begin, NodeStatus.Running));
+    }
+    return sample;
+  }
+}
diff --git 
a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template
 
b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template
index cbd09fb784d..a853a2dc8a4 100644
--- 
a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template
+++ 
b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template
@@ -670,6 +670,26 @@ time_partition_interval=604800000
 # Datatype: long
 heartbeat_interval_in_ms=1000
 
+# Default failure detector, enum from {fixed, phi_accrual}
+# effectiveMode: restart
+# Datatype: string
+failure_detector=phi_accrual
+
+# Fixed failure detector threshold of time elapsed without receiving heartbeat 
replies
+# effectiveMode: restart
+# Datatype: long
+failure_detector_fixed_threshold_in_ms=20000
+
+# Phi accrual: threshold to determine a node is down
+# effectiveMode: restart
+# Datatype: long
+failure_detector_phi_threshold=30
+
+# Phi accrual: acceptable heartbeat pause time (due to JVM GC, etc)
+# effectiveMode: restart
+# Datatype: long
+failure_detector_phi_acceptable_pause_in_ms=10000
+
 # Disk remaining threshold at which DataNode is set to ReadOnly status
 # effectiveMode: restart
 # Datatype: double(percentage)


Reply via email to