This is an automated email from the ASF dual-hosted git repository.
jt2594838 pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/dev/1.3 by this push:
new aa48de75fea Pipe: Fixed the bug that the timeout ms is regarded as s
(#17590) (#17607)
aa48de75fea is described below
commit aa48de75feacb17347169780bffe2f6a6992a8e3
Author: Caideyipi <[email protected]>
AuthorDate: Thu May 7 09:32:37 2026 +0800
Pipe: Fixed the bug that the timeout ms is regarded as s (#17590) (#17607)
---
.../iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java | 2 +-
.../iotdb/commons/pipe/agent/task/PipeTaskAgent.java | 16 ++++++++++++++--
2 files changed, 15 insertions(+), 3 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
index c437507f4b0..9038e7c3a71 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
@@ -374,7 +374,7 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
///////////////////////// Heartbeat /////////////////////////
public void collectPipeMetaList(final TDataNodeHeartbeatResp resp) throws
TException {
- if (!tryReadLockWithTimeOut(
+ if (!tryReadLockWithTimeOutInMs(
CommonDescriptor.getInstance().getConfig().getDnConnectionTimeoutInMS() * 2L /
3)) {
return;
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java
index 726bc1b6a1f..fcff317b3f3 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java
@@ -125,6 +125,10 @@ public abstract class PipeTaskAgent {
}
}
+ protected boolean tryReadLockWithTimeOutInMs(final long timeOutInMs) {
+ return tryReadLockWithTimeOut(convertMsToCeilSeconds(timeOutInMs));
+ }
+
protected void releaseReadLock() {
pipeMetaKeeper.releaseReadLock();
}
@@ -143,10 +147,18 @@ public abstract class PipeTaskAgent {
}
}
+ protected boolean tryWriteLockWithTimeOutInMs(final long timeOutInMs) {
+ return tryWriteLockWithTimeOut(convertMsToCeilSeconds(timeOutInMs));
+ }
+
protected void releaseWriteLock() {
pipeMetaKeeper.releaseWriteLock();
}
+ private long convertMsToCeilSeconds(final long timeOutInMs) {
+ return Math.max(1L, (Math.max(0L, timeOutInMs) + 999L) / 1000L);
+ }
+
////////////////////////// Pipe Task Management Entry
//////////////////////////
public TPushPipeMetaRespExceptionMessage handleSinglePipeMetaChanges(
@@ -363,7 +375,7 @@ public abstract class PipeTaskAgent {
public List<TPushPipeMetaRespExceptionMessage> handlePipeMetaChanges(
final List<PipeMeta> pipeMetaListFromCoordinator) {
- if (!tryWriteLockWithTimeOut(
+ if (!tryWriteLockWithTimeOutInMs(
CommonDescriptor.getInstance().getConfig().getDnConnectionTimeoutInMS() * 2L /
3)) {
return null;
}
@@ -1091,7 +1103,7 @@ public abstract class PipeTaskAgent {
public void collectPipeMetaList(final TPipeHeartbeatReq req, final
TPipeHeartbeatResp resp)
throws TException {
- if (!tryReadLockWithTimeOut(
+ if (!tryReadLockWithTimeOutInMs(
CommonDescriptor.getInstance().getConfig().getDnConnectionTimeoutInMS() * 2L /
3)) {
return;
}