This is an automated email from the ASF dual-hosted git repository.

yongzao pushed a commit to branch heartbeat-timestamp-improvement
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to 
refs/heads/heartbeat-timestamp-improvement by this push:
     new 3287794c32f Finish
3287794c32f is described below

commit 3287794c32f131d4467b9038dc6b2817c3adbf79
Author: YongzaoDan <[email protected]>
AuthorDate: Wed Jan 17 14:48:28 2024 +0800

    Finish
---
 .../client/async/handlers/heartbeat/DataNodeHeartbeatHandler.java   | 2 +-
 .../org/apache/iotdb/confignode/manager/load/cache/LoadCache.java   | 2 +-
 .../iotdb/confignode/manager/load/cache/node/BaseNodeCache.java     | 2 +-
 .../manager/load/cache/node/ConfigNodeHeartbeatCache.java           | 2 +-
 .../confignode/manager/load/cache/node/DataNodeHeartbeatCache.java  | 2 +-
 .../confignode/manager/load/cache/node/NodeHeartbeatSample.java     | 6 ++++--
 .../iotdb/confignode/manager/load/cache/node/NodeStatistics.java    | 2 +-
 .../iotdb/confignode/manager/load/cache/region/RegionCache.java     | 6 +++---
 .../iotdb/confignode/manager/load/service/HeartbeatService.java     | 2 +-
 .../confignode/service/thrift/ConfigNodeRPCServiceProcessor.java    | 2 +-
 .../manager/load/balancer/router/priority/GreedyPriorityTest.java   | 6 +++---
 .../load/balancer/router/priority/LeaderPriorityBalancerTest.java   | 6 +++---
 .../apache/iotdb/confignode/manager/load/cache/NodeCacheTest.java   | 6 +++---
 .../iotdb/confignode/manager/load/cache/RegionGroupCacheTest.java   | 6 +++---
 14 files changed, 27 insertions(+), 25 deletions(-)

diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/heartbeat/DataNodeHeartbeatHandler.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/heartbeat/DataNodeHeartbeatHandler.java
index 9b612db6f4e..99649e7b715 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/heartbeat/DataNodeHeartbeatHandler.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/heartbeat/DataNodeHeartbeatHandler.java
@@ -72,7 +72,7 @@ public class DataNodeHeartbeatHandler implements 
AsyncMethodCallback<TDataNodeHe
 
   @Override
   public void onComplete(TDataNodeHeartbeatResp heartbeatResp) {
-    long receiveTime = System.currentTimeMillis();
+    long receiveTime = System.nanoTime();
 
     // Update NodeCache
     loadCache.cacheDataNodeHeartbeatSample(
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/LoadCache.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/LoadCache.java
index e7900c7dbaa..3ecc795ebf0 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/LoadCache.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/LoadCache.java
@@ -144,7 +144,7 @@ public class LoadCache {
    * @param resp the heartbeat response
    */
   public void cacheConfigNodeHeartbeatSample(int nodeId, 
TConfigNodeHeartbeatResp resp) {
-    long receiveTime = System.currentTimeMillis();
+    long receiveTime = System.nanoTime();
     nodeCacheMap
         .computeIfAbsent(nodeId, empty -> new ConfigNodeHeartbeatCache(nodeId))
         .cacheHeartbeatSample(new NodeHeartbeatSample(resp, receiveTime));
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/node/BaseNodeCache.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/node/BaseNodeCache.java
index 65a1cccd8ea..5c9142f5dc2 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/node/BaseNodeCache.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/node/BaseNodeCache.java
@@ -28,7 +28,7 @@ import java.util.concurrent.atomic.AtomicReference;
 public abstract class BaseNodeCache {
 
   // When the response time of heartbeat is more than 20s, the Node is 
considered as down
-  public static final int HEARTBEAT_TIMEOUT_TIME = 20_000;
+  public static final int HEARTBEAT_TIMEOUT_TIME_IN_NS = 20_000_000;
 
   // Max heartbeat cache samples store size
   public static final int MAXIMUM_WINDOW_SIZE = 100;
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/node/ConfigNodeHeartbeatCache.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/node/ConfigNodeHeartbeatCache.java
index 4e7ee0706d4..ed06a894f73 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/node/ConfigNodeHeartbeatCache.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/node/ConfigNodeHeartbeatCache.java
@@ -63,7 +63,7 @@ public class ConfigNodeHeartbeatCache extends BaseNodeCache {
     // Update Node status
     NodeStatus status = null;
     // TODO: Optimize judge logic
-    if (System.currentTimeMillis() - lastSendTime > HEARTBEAT_TIMEOUT_TIME) {
+    if (System.nanoTime() - lastSendTime > HEARTBEAT_TIMEOUT_TIME_IN_NS) {
       status = NodeStatus.Unknown;
     } else if (lastSample != null) {
       status = lastSample.getStatus();
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/node/DataNodeHeartbeatCache.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/node/DataNodeHeartbeatCache.java
index 7e47edb4fd2..d74f3ee3dd3 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/node/DataNodeHeartbeatCache.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/node/DataNodeHeartbeatCache.java
@@ -56,7 +56,7 @@ public class DataNodeHeartbeatCache extends BaseNodeCache {
     // TODO: Optimize judge logic
     if (lastSample != null && 
NodeStatus.Removing.equals(lastSample.getStatus())) {
       status = NodeStatus.Removing;
-    } else if (System.currentTimeMillis() - lastSendTime > 
HEARTBEAT_TIMEOUT_TIME) {
+    } else if (System.nanoTime() - lastSendTime > 
HEARTBEAT_TIMEOUT_TIME_IN_NS) {
       status = NodeStatus.Unknown;
     } else if (lastSample != null) {
       status = lastSample.getStatus();
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/node/NodeHeartbeatSample.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/node/NodeHeartbeatSample.java
index f24654e6126..698d0f23909 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/node/NodeHeartbeatSample.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/node/NodeHeartbeatSample.java
@@ -20,6 +20,7 @@
 package org.apache.iotdb.confignode.manager.load.cache.node;
 
 import org.apache.iotdb.commons.cluster.NodeStatus;
+import org.apache.iotdb.commons.utils.TestOnly;
 import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeHeartbeatResp;
 import org.apache.iotdb.mpp.rpc.thrift.TDataNodeHeartbeatResp;
 import org.apache.iotdb.mpp.rpc.thrift.TLoadSample;
@@ -35,7 +36,7 @@ public class NodeHeartbeatSample {
 
   private TLoadSample loadSample = null;
 
-  /** Constructor for ConfigNode sample. */
+  @TestOnly
   public NodeHeartbeatSample(long sendTimestamp, long receiveTimestamp) {
     this.sendTimestamp = sendTimestamp;
     this.receiveTimestamp = receiveTimestamp;
@@ -56,6 +57,7 @@ public class NodeHeartbeatSample {
     }
   }
 
+  /** Constructor for ConfigNode sample. */
   public NodeHeartbeatSample(TConfigNodeHeartbeatResp heartbeatResp, long 
receiveTimestamp) {
     this.sendTimestamp = heartbeatResp.getTimestamp();
     this.receiveTimestamp = receiveTimestamp;
@@ -96,7 +98,7 @@ public class NodeHeartbeatSample {
    * @return A NodeHeartbeatSample that only contain timestamp and NodeStatus
    */
   public static NodeHeartbeatSample generateDefaultSample(NodeStatus status) {
-    long currentTime = System.currentTimeMillis();
+    long currentTime = System.nanoTime();
     return new NodeHeartbeatSample(
         new TDataNodeHeartbeatResp(currentTime, 
status.getStatus()).setStatusReason(null),
         currentTime);
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/node/NodeStatistics.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/node/NodeStatistics.java
index 58b8d8a0113..218bd5c0d45 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/node/NodeStatistics.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/node/NodeStatistics.java
@@ -104,7 +104,7 @@ public class NodeStatistics {
   }
 
   public NodeHeartbeatSample convertToNodeHeartbeatSample() {
-    long currentTime = System.currentTimeMillis();
+    long currentTime = System.nanoTime();
     return new NodeHeartbeatSample(
         new TDataNodeHeartbeatResp(currentTime, 
status.getStatus()).setStatusReason(statusReason),
         currentTime);
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/region/RegionCache.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/region/RegionCache.java
index 0bce99e0e89..3ed0217af72 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/region/RegionCache.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/region/RegionCache.java
@@ -25,7 +25,7 @@ import java.util.Collections;
 import java.util.LinkedList;
 import java.util.List;
 
-import static 
org.apache.iotdb.confignode.manager.load.cache.node.BaseNodeCache.HEARTBEAT_TIMEOUT_TIME;
+import static 
org.apache.iotdb.confignode.manager.load.cache.node.BaseNodeCache.HEARTBEAT_TIMEOUT_TIME_IN_NS;
 import static 
org.apache.iotdb.confignode.manager.load.cache.node.BaseNodeCache.MAXIMUM_WINDOW_SIZE;
 
 public class RegionCache {
@@ -61,8 +61,8 @@ public class RegionCache {
     RegionStatus status;
     if (RegionStatus.Removing.equals(lastSample.getStatus())) {
       status = RegionStatus.Removing;
-    } else if (System.currentTimeMillis() - lastSample.getSendTimestamp()
-        > HEARTBEAT_TIMEOUT_TIME) {
+    } else if (System.nanoTime() - lastSample.getSendTimestamp()
+        > HEARTBEAT_TIMEOUT_TIME_IN_NS) {
       status = RegionStatus.Unknown;
     } else {
       status = lastSample.getStatus();
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/HeartbeatService.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/HeartbeatService.java
index 6e733f0b6d6..e6c599aab36 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/HeartbeatService.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/HeartbeatService.java
@@ -122,7 +122,7 @@ public class HeartbeatService {
   private TDataNodeHeartbeatReq genHeartbeatReq() {
     /* Generate heartbeat request */
     TDataNodeHeartbeatReq heartbeatReq = new TDataNodeHeartbeatReq();
-    heartbeatReq.setHeartbeatTimestamp(System.currentTimeMillis());
+    heartbeatReq.setHeartbeatTimestamp(System.nanoTime());
     // Always sample RegionGroups' leadership as the Region heartbeat
     heartbeatReq.setNeedJudgeLeader(true);
     // We sample DataNode's load in every 10 heartbeat loop
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
index a7e51c30b3b..78a47502e7d 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
@@ -833,7 +833,7 @@ public class ConfigNodeRPCServiceProcessor implements 
IConfigNodeRPCService.Ifac
   @Override
   public TConfigNodeHeartbeatResp 
getConfigNodeHeartBeat(TConfigNodeHeartbeatReq heartbeatReq) {
     TConfigNodeHeartbeatResp resp = new TConfigNodeHeartbeatResp();
-    resp.setTimestamp(System.currentTimeMillis());
+    resp.setTimestamp(heartbeatReq.getTimestamp());
     return resp;
   }
 
diff --git 
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/priority/GreedyPriorityTest.java
 
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/priority/GreedyPriorityTest.java
index defe02aa50d..e7eb2e05c30 100644
--- 
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/priority/GreedyPriorityTest.java
+++ 
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/priority/GreedyPriorityTest.java
@@ -57,7 +57,7 @@ public class GreedyPriorityTest {
     }
 
     /* Build nodeCacheMap */
-    long currentTimeMillis = System.currentTimeMillis();
+    long currentTimeNs = System.nanoTime();
     Map<Integer, BaseNodeCache> nodeCacheMap = new HashMap<>();
     NodeStatus[] statuses =
         new NodeStatus[] {
@@ -69,8 +69,8 @@ public class GreedyPriorityTest {
           .get(i)
           .cacheHeartbeatSample(
               new NodeHeartbeatSample(
-                  new TDataNodeHeartbeatResp(currentTimeMillis, 
statuses[i].getStatus()),
-                  currentTimeMillis));
+                  new TDataNodeHeartbeatResp(currentTimeNs, 
statuses[i].getStatus()),
+                  currentTimeNs));
     }
     nodeCacheMap.values().forEach(BaseNodeCache::periodicUpdate);
 
diff --git 
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/priority/LeaderPriorityBalancerTest.java
 
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/priority/LeaderPriorityBalancerTest.java
index f3d0e0b3eb6..d78f43267dc 100644
--- 
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/priority/LeaderPriorityBalancerTest.java
+++ 
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/priority/LeaderPriorityBalancerTest.java
@@ -58,7 +58,7 @@ public class LeaderPriorityBalancerTest {
     }
 
     // Build nodeCacheMap
-    long currentTimeMillis = System.currentTimeMillis();
+    long currentTimeNs = System.nanoTime();
     Map<Integer, BaseNodeCache> nodeCacheMap = new HashMap<>();
     for (int i = 0; i < 6; i++) {
       nodeCacheMap.put(i, new DataNodeHeartbeatCache(i));
@@ -67,8 +67,8 @@ public class LeaderPriorityBalancerTest {
             .get(i)
             .cacheHeartbeatSample(
                 new NodeHeartbeatSample(
-                    new TDataNodeHeartbeatResp(currentTimeMillis, 
NodeStatus.Running.getStatus()),
-                    currentTimeMillis));
+                    new TDataNodeHeartbeatResp(currentTimeNs, 
NodeStatus.Running.getStatus()),
+                    currentTimeNs));
       }
     }
     nodeCacheMap.values().forEach(BaseNodeCache::periodicUpdate);
diff --git 
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/cache/NodeCacheTest.java
 
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/cache/NodeCacheTest.java
index 94d6c33d05e..ce79fbf5571 100644
--- 
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/cache/NodeCacheTest.java
+++ 
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/cache/NodeCacheTest.java
@@ -38,7 +38,7 @@ public class NodeCacheTest {
     Assert.assertEquals(Long.MAX_VALUE, dataNodeHeartbeatCache.getLoadScore());
 
     // Test force update to RunningStatus
-    long currentTime = System.currentTimeMillis() - 2000;
+    long currentTime = System.nanoTime() - 2_000_000;
     dataNodeHeartbeatCache.forceUpdate(
         new NodeHeartbeatSample(
             new TDataNodeHeartbeatResp(currentTime, 
NodeStatus.Running.getStatus()), currentTime));
@@ -58,7 +58,7 @@ public class NodeCacheTest {
   public void periodicUpdateTest() {
     // Test DataNode heartbeat cache
     DataNodeHeartbeatCache dataNodeHeartbeatCache = new 
DataNodeHeartbeatCache(1);
-    long currentTime = System.currentTimeMillis();
+    long currentTime = System.nanoTime();
     dataNodeHeartbeatCache.cacheHeartbeatSample(
         new NodeHeartbeatSample(
             new TDataNodeHeartbeatResp(currentTime, 
NodeStatus.Running.getStatus()), currentTime));
@@ -68,7 +68,7 @@ public class NodeCacheTest {
 
     // Test ConfigNode heartbeat cache
     ConfigNodeHeartbeatCache configNodeHeartbeatCache = new 
ConfigNodeHeartbeatCache(2);
-    currentTime = System.currentTimeMillis();
+    currentTime = System.nanoTime();
     configNodeHeartbeatCache.cacheHeartbeatSample(
         new NodeHeartbeatSample(currentTime, currentTime));
     Assert.assertTrue(configNodeHeartbeatCache.periodicUpdate());
diff --git 
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/cache/RegionGroupCacheTest.java
 
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/cache/RegionGroupCacheTest.java
index 95bffed2005..a06fe2f56b2 100644
--- 
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/cache/RegionGroupCacheTest.java
+++ 
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/cache/RegionGroupCacheTest.java
@@ -35,7 +35,7 @@ public class RegionGroupCacheTest {
 
   @Test
   public void getRegionStatusTest() {
-    long currentTime = System.currentTimeMillis();
+    long currentTime = System.nanoTime();
     RegionGroupCache regionGroupCache =
         new RegionGroupCache(new 
TConsensusGroupId(TConsensusGroupType.SchemaRegion, 1));
     regionGroupCache.cacheHeartbeatSample(
@@ -56,7 +56,7 @@ public class RegionGroupCacheTest {
 
   @Test
   public void getRegionGroupStatusTest() {
-    long currentTime = System.currentTimeMillis();
+    long currentTime = System.nanoTime();
     RegionGroupCache runningRegionGroup =
         new RegionGroupCache(new 
TConsensusGroupId(TConsensusGroupType.DataRegion, 0));
     runningRegionGroup.cacheHeartbeatSample(
@@ -120,7 +120,7 @@ public class RegionGroupCacheTest {
 
   @Test
   public void forceUpdateTest() {
-    long currentTime = System.currentTimeMillis();
+    long currentTime = System.nanoTime();
     Map<Integer, RegionHeartbeatSample> heartbeatSampleMap = new HashMap<>();
     for (int i = 0; i < 3; i++) {
       heartbeatSampleMap.put(

Reply via email to