This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch rel/1.2
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/rel/1.2 by this push:
     new de9db9122a6 [To rel/1.2][IOTDB-6089] Improved the lock behaviour of 
the pipe heartbeat to avoid causing DataNode unknown (#10714) (#10718)
de9db9122a6 is described below

commit de9db9122a6c3611ebe9e6dac5ba8f1757cfb0f5
Author: Caideyipi <[email protected]>
AuthorDate: Fri Jul 28 21:54:34 2023 +0800

    [To rel/1.2][IOTDB-6089] Improved the lock behaviour of the pipe heartbeat 
to avoid causing DataNode unknown (#10714) (#10718)
---
 .../iotdb/db/pipe/agent/task/PipeTaskAgent.java    | 28 ++++++++++++++++++----
 .../commons/pipe/task/meta/PipeMetaKeeper.java     |  5 ++++
 2 files changed, 28 insertions(+), 5 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskAgent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskAgent.java
index 2ebe4156b34..da21b757444 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskAgent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskAgent.java
@@ -93,6 +93,16 @@ public class PipeTaskAgent {
     pipeMetaKeeper.acquireReadLock();
   }
 
+  public boolean tryReadLockWithTimeOut(long timeOutInSeconds) {
+    try {
+      return pipeMetaKeeper.tryReadLock(timeOutInSeconds);
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      LOGGER.warn("Interruption during requiring pipeMetaKeeper lock.", e);
+      return false;
+    }
+  }
+
   private void releaseReadLock() {
     pipeMetaKeeper.releaseReadLock();
   }
@@ -705,18 +715,26 @@ public class PipeTaskAgent {
 
   public synchronized void collectPipeMetaList(THeartbeatReq req, 
THeartbeatResp resp)
       throws TException {
-    acquireReadLock();
+    // If the pipe heartbeat is separated from the cluster heartbeat, then the 
lock doesn't
+    // need to be acquired
+    if (!req.isNeedPipeMetaList()) {
+      return;
+    }
+    // Try the lock instead of directly acquire it to prevent the block of the 
cluster heartbeat
+    // 10s is the half of the HEARTBEAT_TIMEOUT_TIME defined in class 
BaseNodeCache in ConfigNode
+    if (!tryReadLockWithTimeOut(10)) {
+      return;
+    }
     try {
-      collectPipeMetaListInternal(req, resp);
+      collectPipeMetaListInternal(resp);
     } finally {
       releaseReadLock();
     }
   }
 
-  private void collectPipeMetaListInternal(THeartbeatReq req, THeartbeatResp 
resp)
-      throws TException {
+  private void collectPipeMetaListInternal(THeartbeatResp resp) throws 
TException {
     // Do nothing if data node is removing or removed, or request does not 
need pipe meta list
-    if (PipeAgent.runtime().isShutdown() || !req.isNeedPipeMetaList()) {
+    if (PipeAgent.runtime().isShutdown()) {
       return;
     }
 
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeMetaKeeper.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeMetaKeeper.java
index 613655c1d01..85c58ecc873 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeMetaKeeper.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeMetaKeeper.java
@@ -27,6 +27,7 @@ import java.io.IOException;
 import java.util.Map;
 import java.util.Objects;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 public class PipeMetaKeeper {
@@ -46,6 +47,10 @@ public class PipeMetaKeeper {
     pipeMetaKeeperLock.readLock().lock();
   }
 
+  public boolean tryReadLock(long timeOut) throws InterruptedException {
+    return pipeMetaKeeperLock.readLock().tryLock(timeOut, TimeUnit.SECONDS);
+  }
+
   public void releaseReadLock() {
     pipeMetaKeeperLock.readLock().unlock();
   }

Reply via email to