This is an automated email from the ASF dual-hosted git repository.
yongzao pushed a commit to branch heartbeat-timestamp-improvement
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to
refs/heads/heartbeat-timestamp-improvement by this push:
new 3287794c32f Finish
3287794c32f is described below
commit 3287794c32f131d4467b9038dc6b2817c3adbf79
Author: YongzaoDan <[email protected]>
AuthorDate: Wed Jan 17 14:48:28 2024 +0800
Finish
---
.../client/async/handlers/heartbeat/DataNodeHeartbeatHandler.java | 2 +-
.../org/apache/iotdb/confignode/manager/load/cache/LoadCache.java | 2 +-
.../iotdb/confignode/manager/load/cache/node/BaseNodeCache.java | 2 +-
.../manager/load/cache/node/ConfigNodeHeartbeatCache.java | 2 +-
.../confignode/manager/load/cache/node/DataNodeHeartbeatCache.java | 2 +-
.../confignode/manager/load/cache/node/NodeHeartbeatSample.java | 6 ++++--
.../iotdb/confignode/manager/load/cache/node/NodeStatistics.java | 2 +-
.../iotdb/confignode/manager/load/cache/region/RegionCache.java | 6 +++---
.../iotdb/confignode/manager/load/service/HeartbeatService.java | 2 +-
.../confignode/service/thrift/ConfigNodeRPCServiceProcessor.java | 2 +-
.../manager/load/balancer/router/priority/GreedyPriorityTest.java | 6 +++---
.../load/balancer/router/priority/LeaderPriorityBalancerTest.java | 6 +++---
.../apache/iotdb/confignode/manager/load/cache/NodeCacheTest.java | 6 +++---
.../iotdb/confignode/manager/load/cache/RegionGroupCacheTest.java | 6 +++---
14 files changed, 27 insertions(+), 25 deletions(-)
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/heartbeat/DataNodeHeartbeatHandler.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/heartbeat/DataNodeHeartbeatHandler.java
index 9b612db6f4e..99649e7b715 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/heartbeat/DataNodeHeartbeatHandler.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/heartbeat/DataNodeHeartbeatHandler.java
@@ -72,7 +72,7 @@ public class DataNodeHeartbeatHandler implements
AsyncMethodCallback<TDataNodeHe
@Override
public void onComplete(TDataNodeHeartbeatResp heartbeatResp) {
- long receiveTime = System.currentTimeMillis();
+ long receiveTime = System.nanoTime();
// Update NodeCache
loadCache.cacheDataNodeHeartbeatSample(
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/LoadCache.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/LoadCache.java
index e7900c7dbaa..3ecc795ebf0 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/LoadCache.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/LoadCache.java
@@ -144,7 +144,7 @@ public class LoadCache {
* @param resp the heartbeat response
*/
public void cacheConfigNodeHeartbeatSample(int nodeId,
TConfigNodeHeartbeatResp resp) {
- long receiveTime = System.currentTimeMillis();
+ long receiveTime = System.nanoTime();
nodeCacheMap
.computeIfAbsent(nodeId, empty -> new ConfigNodeHeartbeatCache(nodeId))
.cacheHeartbeatSample(new NodeHeartbeatSample(resp, receiveTime));
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/node/BaseNodeCache.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/node/BaseNodeCache.java
index 65a1cccd8ea..5c9142f5dc2 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/node/BaseNodeCache.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/node/BaseNodeCache.java
@@ -28,7 +28,7 @@ import java.util.concurrent.atomic.AtomicReference;
public abstract class BaseNodeCache {
// When the response time of heartbeat is more than 20s, the Node is
considered as down
- public static final int HEARTBEAT_TIMEOUT_TIME = 20_000;
+ public static final int HEARTBEAT_TIMEOUT_TIME_IN_NS = 20_000_000;
// Max heartbeat cache samples store size
public static final int MAXIMUM_WINDOW_SIZE = 100;
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/node/ConfigNodeHeartbeatCache.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/node/ConfigNodeHeartbeatCache.java
index 4e7ee0706d4..ed06a894f73 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/node/ConfigNodeHeartbeatCache.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/node/ConfigNodeHeartbeatCache.java
@@ -63,7 +63,7 @@ public class ConfigNodeHeartbeatCache extends BaseNodeCache {
// Update Node status
NodeStatus status = null;
// TODO: Optimize judge logic
- if (System.currentTimeMillis() - lastSendTime > HEARTBEAT_TIMEOUT_TIME) {
+ if (System.nanoTime() - lastSendTime > HEARTBEAT_TIMEOUT_TIME_IN_NS) {
status = NodeStatus.Unknown;
} else if (lastSample != null) {
status = lastSample.getStatus();
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/node/DataNodeHeartbeatCache.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/node/DataNodeHeartbeatCache.java
index 7e47edb4fd2..d74f3ee3dd3 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/node/DataNodeHeartbeatCache.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/node/DataNodeHeartbeatCache.java
@@ -56,7 +56,7 @@ public class DataNodeHeartbeatCache extends BaseNodeCache {
// TODO: Optimize judge logic
if (lastSample != null &&
NodeStatus.Removing.equals(lastSample.getStatus())) {
status = NodeStatus.Removing;
- } else if (System.currentTimeMillis() - lastSendTime >
HEARTBEAT_TIMEOUT_TIME) {
+ } else if (System.nanoTime() - lastSendTime >
HEARTBEAT_TIMEOUT_TIME_IN_NS) {
status = NodeStatus.Unknown;
} else if (lastSample != null) {
status = lastSample.getStatus();
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/node/NodeHeartbeatSample.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/node/NodeHeartbeatSample.java
index f24654e6126..698d0f23909 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/node/NodeHeartbeatSample.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/node/NodeHeartbeatSample.java
@@ -20,6 +20,7 @@
package org.apache.iotdb.confignode.manager.load.cache.node;
import org.apache.iotdb.commons.cluster.NodeStatus;
+import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeHeartbeatResp;
import org.apache.iotdb.mpp.rpc.thrift.TDataNodeHeartbeatResp;
import org.apache.iotdb.mpp.rpc.thrift.TLoadSample;
@@ -35,7 +36,7 @@ public class NodeHeartbeatSample {
private TLoadSample loadSample = null;
- /** Constructor for ConfigNode sample. */
+ @TestOnly
public NodeHeartbeatSample(long sendTimestamp, long receiveTimestamp) {
this.sendTimestamp = sendTimestamp;
this.receiveTimestamp = receiveTimestamp;
@@ -56,6 +57,7 @@ public class NodeHeartbeatSample {
}
}
+ /** Constructor for ConfigNode sample. */
public NodeHeartbeatSample(TConfigNodeHeartbeatResp heartbeatResp, long
receiveTimestamp) {
this.sendTimestamp = heartbeatResp.getTimestamp();
this.receiveTimestamp = receiveTimestamp;
@@ -96,7 +98,7 @@ public class NodeHeartbeatSample {
* @return A NodeHeartbeatSample that only contain timestamp and NodeStatus
*/
public static NodeHeartbeatSample generateDefaultSample(NodeStatus status) {
- long currentTime = System.currentTimeMillis();
+ long currentTime = System.nanoTime();
return new NodeHeartbeatSample(
new TDataNodeHeartbeatResp(currentTime,
status.getStatus()).setStatusReason(null),
currentTime);
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/node/NodeStatistics.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/node/NodeStatistics.java
index 58b8d8a0113..218bd5c0d45 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/node/NodeStatistics.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/node/NodeStatistics.java
@@ -104,7 +104,7 @@ public class NodeStatistics {
}
public NodeHeartbeatSample convertToNodeHeartbeatSample() {
- long currentTime = System.currentTimeMillis();
+ long currentTime = System.nanoTime();
return new NodeHeartbeatSample(
new TDataNodeHeartbeatResp(currentTime,
status.getStatus()).setStatusReason(statusReason),
currentTime);
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/region/RegionCache.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/region/RegionCache.java
index 0bce99e0e89..3ed0217af72 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/region/RegionCache.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/region/RegionCache.java
@@ -25,7 +25,7 @@ import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
-import static
org.apache.iotdb.confignode.manager.load.cache.node.BaseNodeCache.HEARTBEAT_TIMEOUT_TIME;
+import static
org.apache.iotdb.confignode.manager.load.cache.node.BaseNodeCache.HEARTBEAT_TIMEOUT_TIME_IN_NS;
import static
org.apache.iotdb.confignode.manager.load.cache.node.BaseNodeCache.MAXIMUM_WINDOW_SIZE;
public class RegionCache {
@@ -61,8 +61,8 @@ public class RegionCache {
RegionStatus status;
if (RegionStatus.Removing.equals(lastSample.getStatus())) {
status = RegionStatus.Removing;
- } else if (System.currentTimeMillis() - lastSample.getSendTimestamp()
- > HEARTBEAT_TIMEOUT_TIME) {
+ } else if (System.nanoTime() - lastSample.getSendTimestamp()
+ > HEARTBEAT_TIMEOUT_TIME_IN_NS) {
status = RegionStatus.Unknown;
} else {
status = lastSample.getStatus();
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/HeartbeatService.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/HeartbeatService.java
index 6e733f0b6d6..e6c599aab36 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/HeartbeatService.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/HeartbeatService.java
@@ -122,7 +122,7 @@ public class HeartbeatService {
private TDataNodeHeartbeatReq genHeartbeatReq() {
/* Generate heartbeat request */
TDataNodeHeartbeatReq heartbeatReq = new TDataNodeHeartbeatReq();
- heartbeatReq.setHeartbeatTimestamp(System.currentTimeMillis());
+ heartbeatReq.setHeartbeatTimestamp(System.nanoTime());
// Always sample RegionGroups' leadership as the Region heartbeat
heartbeatReq.setNeedJudgeLeader(true);
// We sample DataNode's load in every 10 heartbeat loop
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
index a7e51c30b3b..78a47502e7d 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
@@ -833,7 +833,7 @@ public class ConfigNodeRPCServiceProcessor implements
IConfigNodeRPCService.Ifac
@Override
public TConfigNodeHeartbeatResp
getConfigNodeHeartBeat(TConfigNodeHeartbeatReq heartbeatReq) {
TConfigNodeHeartbeatResp resp = new TConfigNodeHeartbeatResp();
- resp.setTimestamp(System.currentTimeMillis());
+ resp.setTimestamp(heartbeatReq.getTimestamp());
return resp;
}
diff --git
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/priority/GreedyPriorityTest.java
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/priority/GreedyPriorityTest.java
index defe02aa50d..e7eb2e05c30 100644
---
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/priority/GreedyPriorityTest.java
+++
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/priority/GreedyPriorityTest.java
@@ -57,7 +57,7 @@ public class GreedyPriorityTest {
}
/* Build nodeCacheMap */
- long currentTimeMillis = System.currentTimeMillis();
+ long currentTimeNs = System.nanoTime();
Map<Integer, BaseNodeCache> nodeCacheMap = new HashMap<>();
NodeStatus[] statuses =
new NodeStatus[] {
@@ -69,8 +69,8 @@ public class GreedyPriorityTest {
.get(i)
.cacheHeartbeatSample(
new NodeHeartbeatSample(
- new TDataNodeHeartbeatResp(currentTimeMillis,
statuses[i].getStatus()),
- currentTimeMillis));
+ new TDataNodeHeartbeatResp(currentTimeNs,
statuses[i].getStatus()),
+ currentTimeNs));
}
nodeCacheMap.values().forEach(BaseNodeCache::periodicUpdate);
diff --git
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/priority/LeaderPriorityBalancerTest.java
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/priority/LeaderPriorityBalancerTest.java
index f3d0e0b3eb6..d78f43267dc 100644
---
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/priority/LeaderPriorityBalancerTest.java
+++
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/priority/LeaderPriorityBalancerTest.java
@@ -58,7 +58,7 @@ public class LeaderPriorityBalancerTest {
}
// Build nodeCacheMap
- long currentTimeMillis = System.currentTimeMillis();
+ long currentTimeNs = System.nanoTime();
Map<Integer, BaseNodeCache> nodeCacheMap = new HashMap<>();
for (int i = 0; i < 6; i++) {
nodeCacheMap.put(i, new DataNodeHeartbeatCache(i));
@@ -67,8 +67,8 @@ public class LeaderPriorityBalancerTest {
.get(i)
.cacheHeartbeatSample(
new NodeHeartbeatSample(
- new TDataNodeHeartbeatResp(currentTimeMillis,
NodeStatus.Running.getStatus()),
- currentTimeMillis));
+ new TDataNodeHeartbeatResp(currentTimeNs,
NodeStatus.Running.getStatus()),
+ currentTimeNs));
}
}
nodeCacheMap.values().forEach(BaseNodeCache::periodicUpdate);
diff --git
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/cache/NodeCacheTest.java
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/cache/NodeCacheTest.java
index 94d6c33d05e..ce79fbf5571 100644
---
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/cache/NodeCacheTest.java
+++
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/cache/NodeCacheTest.java
@@ -38,7 +38,7 @@ public class NodeCacheTest {
Assert.assertEquals(Long.MAX_VALUE, dataNodeHeartbeatCache.getLoadScore());
// Test force update to RunningStatus
- long currentTime = System.currentTimeMillis() - 2000;
+ long currentTime = System.nanoTime() - 2_000_000;
dataNodeHeartbeatCache.forceUpdate(
new NodeHeartbeatSample(
new TDataNodeHeartbeatResp(currentTime,
NodeStatus.Running.getStatus()), currentTime));
@@ -58,7 +58,7 @@ public class NodeCacheTest {
public void periodicUpdateTest() {
// Test DataNode heartbeat cache
DataNodeHeartbeatCache dataNodeHeartbeatCache = new
DataNodeHeartbeatCache(1);
- long currentTime = System.currentTimeMillis();
+ long currentTime = System.nanoTime();
dataNodeHeartbeatCache.cacheHeartbeatSample(
new NodeHeartbeatSample(
new TDataNodeHeartbeatResp(currentTime,
NodeStatus.Running.getStatus()), currentTime));
@@ -68,7 +68,7 @@ public class NodeCacheTest {
// Test ConfigNode heartbeat cache
ConfigNodeHeartbeatCache configNodeHeartbeatCache = new
ConfigNodeHeartbeatCache(2);
- currentTime = System.currentTimeMillis();
+ currentTime = System.nanoTime();
configNodeHeartbeatCache.cacheHeartbeatSample(
new NodeHeartbeatSample(currentTime, currentTime));
Assert.assertTrue(configNodeHeartbeatCache.periodicUpdate());
diff --git
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/cache/RegionGroupCacheTest.java
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/cache/RegionGroupCacheTest.java
index 95bffed2005..a06fe2f56b2 100644
---
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/cache/RegionGroupCacheTest.java
+++
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/cache/RegionGroupCacheTest.java
@@ -35,7 +35,7 @@ public class RegionGroupCacheTest {
@Test
public void getRegionStatusTest() {
- long currentTime = System.currentTimeMillis();
+ long currentTime = System.nanoTime();
RegionGroupCache regionGroupCache =
new RegionGroupCache(new
TConsensusGroupId(TConsensusGroupType.SchemaRegion, 1));
regionGroupCache.cacheHeartbeatSample(
@@ -56,7 +56,7 @@ public class RegionGroupCacheTest {
@Test
public void getRegionGroupStatusTest() {
- long currentTime = System.currentTimeMillis();
+ long currentTime = System.nanoTime();
RegionGroupCache runningRegionGroup =
new RegionGroupCache(new
TConsensusGroupId(TConsensusGroupType.DataRegion, 0));
runningRegionGroup.cacheHeartbeatSample(
@@ -120,7 +120,7 @@ public class RegionGroupCacheTest {
@Test
public void forceUpdateTest() {
- long currentTime = System.currentTimeMillis();
+ long currentTime = System.nanoTime();
Map<Integer, RegionHeartbeatSample> heartbeatSampleMap = new HashMap<>();
for (int i = 0; i < 3; i++) {
heartbeatSampleMap.put(