This is an automated email from the ASF dual-hosted git repository.
jt2594838 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new bbc3c703237 Pipe: Fixed the bug that the timeout ms is regarded as s
(#17590)
bbc3c703237 is described below
commit bbc3c703237ee35a958c964b87815c578b040471
Author: Caideyipi <[email protected]>
AuthorDate: Wed May 6 10:36:23 2026 +0800
Pipe: Fixed the bug that the timeout ms is regarded as s (#17590)
---
.../iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java | 2 +-
.../iotdb/commons/pipe/agent/task/PipeTaskAgent.java | 16 ++++++++++++++--
2 files changed, 15 insertions(+), 3 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
index 031cfd3a62e..06d6a512b30 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
@@ -377,7 +377,7 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
///////////////////////// Heartbeat /////////////////////////
public void collectPipeMetaList(final TDataNodeHeartbeatResp resp) throws
TException {
- if (!tryReadLockWithTimeOut(
+ if (!tryReadLockWithTimeOutInMs(
CommonDescriptor.getInstance().getConfig().getDnConnectionTimeoutInMS() * 2L /
3)) {
return;
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java
index 73b543592c1..99d45298794 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java
@@ -130,6 +130,10 @@ public abstract class PipeTaskAgent {
}
}
+ protected boolean tryReadLockWithTimeOutInMs(final long timeOutInMs) {
+ return tryReadLockWithTimeOut(convertMsToCeilSeconds(timeOutInMs));
+ }
+
protected void releaseReadLock() {
pipeMetaKeeper.releaseReadLock();
}
@@ -148,10 +152,18 @@ public abstract class PipeTaskAgent {
}
}
+ protected boolean tryWriteLockWithTimeOutInMs(final long timeOutInMs) {
+ return tryWriteLockWithTimeOut(convertMsToCeilSeconds(timeOutInMs));
+ }
+
protected void releaseWriteLock() {
pipeMetaKeeper.releaseWriteLock();
}
+ private long convertMsToCeilSeconds(final long timeOutInMs) {
+ return Math.max(1L, (Math.max(0L, timeOutInMs) + 999L) / 1000L);
+ }
+
////////////////////////// Pipe Task Management Entry
//////////////////////////
public TPushPipeMetaRespExceptionMessage handleSinglePipeMetaChanges(
@@ -368,7 +380,7 @@ public abstract class PipeTaskAgent {
public List<TPushPipeMetaRespExceptionMessage> handlePipeMetaChanges(
final List<PipeMeta> pipeMetaListFromCoordinator) {
- if (!tryWriteLockWithTimeOut(
+ if (!tryWriteLockWithTimeOutInMs(
CommonDescriptor.getInstance().getConfig().getDnConnectionTimeoutInMS() * 2L /
3)) {
return null;
}
@@ -1107,7 +1119,7 @@ public abstract class PipeTaskAgent {
public void collectPipeMetaList(final TPipeHeartbeatReq req, final
TPipeHeartbeatResp resp)
throws TException {
- if (!tryReadLockWithTimeOut(
+ if (!tryReadLockWithTimeOutInMs(
CommonDescriptor.getInstance().getConfig().getDnConnectionTimeoutInMS() * 2L /
3)) {
return;
}