This is an automated email from the ASF dual-hosted git repository.
tanxinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 06c0911adca [RTO/RPO] Phi logging/concurrency Improvement (#15092)
06c0911adca is described below
commit 06c0911adca5f2351952f67c41c198f4956a3582
Author: William Song <[email protected]>
AuthorDate: Fri Mar 14 14:22:25 2025 +0800
[RTO/RPO] Phi logging/concurrency Improvement (#15092)
* fix phi issue
* fix phi issue
---
.../load/cache/detector/PhiAccrualDetector.java | 4 +--
.../load/cache/node/AINodeHeartbeatCache.java | 35 ++++++++++----------
.../load/cache/node/ConfigNodeHeartbeatCache.java | 24 +++++++-------
.../load/cache/node/DataNodeHeartbeatCache.java | 37 +++++++++++-----------
.../manager/load/cache/region/RegionCache.java | 24 +++++++-------
5 files changed, 61 insertions(+), 63 deletions(-)
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/detector/PhiAccrualDetector.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/detector/PhiAccrualDetector.java
index a4947e8a3b0..39cc45c1519 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/detector/PhiAccrualDetector.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/detector/PhiAccrualDetector.java
@@ -71,7 +71,7 @@ public class PhiAccrualDetector implements IFailureDetector {
}
final PhiAccrual phiAccrual = create(history);
final boolean isAvailable = phiAccrual.phi() < (double) this.threshold;
- if (!isAvailable) {
+ if (!isAvailable && LOGGER.isDebugEnabled()) {
// log the status change and dump the heartbeat history for analysis use
final StringBuilder builder = new StringBuilder();
builder.append("[");
@@ -81,7 +81,7 @@ public class PhiAccrualDetector implements IFailureDetector {
}
builder.append(phiAccrual.timeElapsedSinceLastHeartbeat / 1000_000);
builder.append("]");
- LOGGER.info(String.format("Node Down, heartbeat history (ms): %s",
builder));
+ LOGGER.debug(String.format("Node Down, heartbeat history (ms): %s",
builder));
}
return isAvailable;
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/node/AINodeHeartbeatCache.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/node/AINodeHeartbeatCache.java
index d60f3195ff9..187c35802af 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/node/AINodeHeartbeatCache.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/node/AINodeHeartbeatCache.java
@@ -40,28 +40,27 @@ public class AINodeHeartbeatCache extends BaseNodeCache {
public void updateCurrentStatistics(boolean forceUpdate) {
NodeHeartbeatSample lastSample;
final List<AbstractHeartbeatSample> heartbeatHistory;
- synchronized (slidingWindow) {
- lastSample = (NodeHeartbeatSample) getLastSample();
- heartbeatHistory = Collections.unmodifiableList(slidingWindow);
- }
-
- /* Update load sample */
- if (lastSample != null && lastSample.isSetLoadSample()) {
- latestLoadSample.set((lastSample.getLoadSample()));
- }
-
/* Update Node status */
NodeStatus status = null;
String statusReason = null;
long currentNanoTime = System.nanoTime();
- if (lastSample != null &&
NodeStatus.Removing.equals(lastSample.getStatus())) {
- status = NodeStatus.Removing;
- } else if (!failureDetector.isAvailable(heartbeatHistory)) {
- /* Failure detector decides that this AINode is UNKNOWN */
- status = NodeStatus.Unknown;
- } else if (lastSample != null) {
- status = lastSample.getStatus();
- statusReason = lastSample.getStatusReason();
+ synchronized (slidingWindow) {
+ lastSample = (NodeHeartbeatSample) getLastSample();
+ heartbeatHistory = Collections.unmodifiableList(slidingWindow);
+ /* Update load sample */
+ if (lastSample != null && lastSample.isSetLoadSample()) {
+ latestLoadSample.set((lastSample.getLoadSample()));
+ }
+
+ if (lastSample != null &&
NodeStatus.Removing.equals(lastSample.getStatus())) {
+ status = NodeStatus.Removing;
+ } else if (!failureDetector.isAvailable(heartbeatHistory)) {
+ /* Failure detector decides that this AINode is UNKNOWN */
+ status = NodeStatus.Unknown;
+ } else if (lastSample != null) {
+ status = lastSample.getStatus();
+ statusReason = lastSample.getStatusReason();
+ }
}
long loadScore = NodeStatus.isNormalStatus(status) ? 0 : Long.MAX_VALUE;
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/node/ConfigNodeHeartbeatCache.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/node/ConfigNodeHeartbeatCache.java
index 1f6eda65f50..a4b8051cb74 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/node/ConfigNodeHeartbeatCache.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/node/ConfigNodeHeartbeatCache.java
@@ -55,23 +55,23 @@ public class ConfigNodeHeartbeatCache extends BaseNodeCache
{
}
NodeHeartbeatSample lastSample;
+ // Update Node status
+ NodeStatus status;
+ long currentNanoTime = System.nanoTime();
final List<AbstractHeartbeatSample> heartbeatHistory;
synchronized (slidingWindow) {
lastSample = (NodeHeartbeatSample) getLastSample();
heartbeatHistory = Collections.unmodifiableList(slidingWindow);
- }
- // Update Node status
- NodeStatus status;
- long currentNanoTime = System.nanoTime();
- if (lastSample == null) {
- /* First heartbeat not received from this ConfigNode, status is UNKNOWN
*/
- status = NodeStatus.Unknown;
- } else if (!failureDetector.isAvailable(heartbeatHistory)) {
- /* Failure detector decides that this ConfigNode is UNKNOWN */
- status = NodeStatus.Unknown;
- } else {
- status = lastSample.getStatus();
+ if (lastSample == null) {
+ /* First heartbeat not received from this ConfigNode, status is
UNKNOWN */
+ status = NodeStatus.Unknown;
+ } else if (!failureDetector.isAvailable(heartbeatHistory)) {
+ /* Failure detector decides that this ConfigNode is UNKNOWN */
+ status = NodeStatus.Unknown;
+ } else {
+ status = lastSample.getStatus();
+ }
}
/* Update loadScore */
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/node/DataNodeHeartbeatCache.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/node/DataNodeHeartbeatCache.java
index a5293d8b397..87dccb1465d 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/node/DataNodeHeartbeatCache.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/node/DataNodeHeartbeatCache.java
@@ -53,29 +53,28 @@ public class DataNodeHeartbeatCache extends BaseNodeCache {
NodeHeartbeatSample lastSample;
final List<AbstractHeartbeatSample> heartbeatHistory;
- synchronized (slidingWindow) {
- lastSample = (NodeHeartbeatSample) getLastSample();
- heartbeatHistory = Collections.unmodifiableList(slidingWindow);
- }
-
- /* Update load sample */
- if (lastSample != null && lastSample.isSetLoadSample()) {
- latestLoadSample.set(lastSample.getLoadSample());
- }
-
/* Update Node status */
NodeStatus status;
String statusReason = null;
long currentNanoTime = System.nanoTime();
- if (lastSample == null) {
- /* First heartbeat not received from this DataNode, status is UNKNOWN */
- status = NodeStatus.Unknown;
- } else if (!failureDetector.isAvailable(heartbeatHistory)) {
- /* Failure detector decides that this DataNode is UNKNOWN */
- status = NodeStatus.Unknown;
- } else {
- status = lastSample.getStatus();
- statusReason = lastSample.getStatusReason();
+ synchronized (slidingWindow) {
+ lastSample = (NodeHeartbeatSample) getLastSample();
+ heartbeatHistory = Collections.unmodifiableList(slidingWindow);
+ /* Update load sample */
+ if (lastSample != null && lastSample.isSetLoadSample()) {
+ latestLoadSample.set(lastSample.getLoadSample());
+ }
+
+ if (lastSample == null) {
+ /* First heartbeat not received from this DataNode, status is UNKNOWN
*/
+ status = NodeStatus.Unknown;
+ } else if (!failureDetector.isAvailable(heartbeatHistory)) {
+ /* Failure detector decides that this DataNode is UNKNOWN */
+ status = NodeStatus.Unknown;
+ } else {
+ status = lastSample.getStatus();
+ statusReason = lastSample.getStatusReason();
+ }
}
/* Update loadScore */
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/region/RegionCache.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/region/RegionCache.java
index b2eb95845f9..9210585428d 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/region/RegionCache.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/region/RegionCache.java
@@ -44,20 +44,20 @@ public class RegionCache extends AbstractLoadCache {
synchronized (slidingWindow) {
lastSample = (RegionHeartbeatSample) getLastSample();
history = Collections.unmodifiableList(slidingWindow);
- }
- RegionStatus status;
- long currentNanoTime = System.nanoTime();
- if (lastSample == null) {
- /* First heartbeat not received from this region, status is UNKNOWN */
- status = RegionStatus.Unknown;
- } else if (!failureDetector.isAvailable(history)) {
- /* Failure detector decides that this region is UNKNOWN */
- status = RegionStatus.Unknown;
- } else {
- status = lastSample.getStatus();
+ RegionStatus status;
+ long currentNanoTime = System.nanoTime();
+ if (lastSample == null) {
+ /* First heartbeat not received from this region, status is UNKNOWN */
+ status = RegionStatus.Unknown;
+ } else if (!failureDetector.isAvailable(history)) {
+ /* Failure detector decides that this region is UNKNOWN */
+ status = RegionStatus.Unknown;
+ } else {
+ status = lastSample.getStatus();
+ }
+ this.currentStatistics.set(new RegionStatistics(currentNanoTime,
status));
}
- this.currentStatistics.set(new RegionStatistics(currentNanoTime, status));
}
public RegionStatistics getCurrentStatistics() {