This is an automated email from the ASF dual-hosted git repository.

jt2594838 pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/dev/1.3 by this push:
     new aa48de75fea Pipe: Fixed the bug that the timeout ms is regarded as s 
(#17590) (#17607)
aa48de75fea is described below

commit aa48de75feacb17347169780bffe2f6a6992a8e3
Author: Caideyipi <[email protected]>
AuthorDate: Thu May 7 09:32:37 2026 +0800

    Pipe: Fixed the bug that the timeout ms is regarded as s (#17590) (#17607)
---
 .../iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java  |  2 +-
 .../iotdb/commons/pipe/agent/task/PipeTaskAgent.java     | 16 ++++++++++++++--
 2 files changed, 15 insertions(+), 3 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
index c437507f4b0..9038e7c3a71 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
@@ -374,7 +374,7 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
   ///////////////////////// Heartbeat /////////////////////////
 
   public void collectPipeMetaList(final TDataNodeHeartbeatResp resp) throws 
TException {
-    if (!tryReadLockWithTimeOut(
+    if (!tryReadLockWithTimeOutInMs(
         
CommonDescriptor.getInstance().getConfig().getDnConnectionTimeoutInMS() * 2L / 
3)) {
       return;
     }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java
index 726bc1b6a1f..fcff317b3f3 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java
@@ -125,6 +125,10 @@ public abstract class PipeTaskAgent {
     }
   }
 
+  protected boolean tryReadLockWithTimeOutInMs(final long timeOutInMs) {
+    return tryReadLockWithTimeOut(convertMsToCeilSeconds(timeOutInMs));
+  }
+
   protected void releaseReadLock() {
     pipeMetaKeeper.releaseReadLock();
   }
@@ -143,10 +147,18 @@ public abstract class PipeTaskAgent {
     }
   }
 
+  protected boolean tryWriteLockWithTimeOutInMs(final long timeOutInMs) {
+    return tryWriteLockWithTimeOut(convertMsToCeilSeconds(timeOutInMs));
+  }
+
   protected void releaseWriteLock() {
     pipeMetaKeeper.releaseWriteLock();
   }
 
+  private long convertMsToCeilSeconds(final long timeOutInMs) {
+    return Math.max(1L, (Math.max(0L, timeOutInMs) + 999L) / 1000L);
+  }
+
   ////////////////////////// Pipe Task Management Entry 
//////////////////////////
 
   public TPushPipeMetaRespExceptionMessage handleSinglePipeMetaChanges(
@@ -363,7 +375,7 @@ public abstract class PipeTaskAgent {
 
   public List<TPushPipeMetaRespExceptionMessage> handlePipeMetaChanges(
       final List<PipeMeta> pipeMetaListFromCoordinator) {
-    if (!tryWriteLockWithTimeOut(
+    if (!tryWriteLockWithTimeOutInMs(
         
CommonDescriptor.getInstance().getConfig().getDnConnectionTimeoutInMS() * 2L / 
3)) {
       return null;
     }
@@ -1091,7 +1103,7 @@ public abstract class PipeTaskAgent {
 
   public void collectPipeMetaList(final TPipeHeartbeatReq req, final 
TPipeHeartbeatResp resp)
       throws TException {
-    if (!tryReadLockWithTimeOut(
+    if (!tryReadLockWithTimeOutInMs(
         
CommonDescriptor.getInstance().getConfig().getDnConnectionTimeoutInMS() * 2L / 
3)) {
       return;
     }

Reply via email to