This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 9325134f46c [IOTDB-6089] Improved the lock behaviour of the pipe 
heartbeat to avoid causing DataNode unknown (#10714)
9325134f46c is described below

commit 9325134f46cdd528b78a96617aaef8e5611397f0
Author: Caideyipi <[email protected]>
AuthorDate: Fri Jul 28 19:48:29 2023 +0800

    [IOTDB-6089] Improved the lock behaviour of the pipe heartbeat to avoid 
causing DataNode unknown (#10714)
---
 .../iotdb/db/pipe/agent/task/PipeTaskAgent.java    | 28 ++++++++++++++++++----
 .../commons/pipe/task/meta/PipeMetaKeeper.java     |  5 ++++
 2 files changed, 28 insertions(+), 5 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskAgent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskAgent.java
index 2ebe4156b34..da21b757444 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskAgent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskAgent.java
@@ -93,6 +93,16 @@ public class PipeTaskAgent {
     pipeMetaKeeper.acquireReadLock();
   }
 
+  public boolean tryReadLockWithTimeOut(long timeOutInSeconds) {
+    try {
+      return pipeMetaKeeper.tryReadLock(timeOutInSeconds);
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      LOGGER.warn("Interruption during requiring pipeMetaKeeper lock.", e);
+      return false;
+    }
+  }
+
   private void releaseReadLock() {
     pipeMetaKeeper.releaseReadLock();
   }
@@ -705,18 +715,26 @@ public class PipeTaskAgent {
 
   public synchronized void collectPipeMetaList(THeartbeatReq req, 
THeartbeatResp resp)
       throws TException {
-    acquireReadLock();
+    // If the pipe heartbeat is separated from the cluster heartbeat, then the 
lock doesn't
+    // need to be acquired
+    if (!req.isNeedPipeMetaList()) {
+      return;
+    }
+    // Try the lock instead of directly acquire it to prevent the block of the 
cluster heartbeat
+    // 10s is the half of the HEARTBEAT_TIMEOUT_TIME defined in class 
BaseNodeCache in ConfigNode
+    if (!tryReadLockWithTimeOut(10)) {
+      return;
+    }
     try {
-      collectPipeMetaListInternal(req, resp);
+      collectPipeMetaListInternal(resp);
     } finally {
       releaseReadLock();
     }
   }
 
-  private void collectPipeMetaListInternal(THeartbeatReq req, THeartbeatResp 
resp)
-      throws TException {
+  private void collectPipeMetaListInternal(THeartbeatResp resp) throws 
TException {
     // Do nothing if data node is removing or removed, or request does not 
need pipe meta list
-    if (PipeAgent.runtime().isShutdown() || !req.isNeedPipeMetaList()) {
+    if (PipeAgent.runtime().isShutdown()) {
       return;
     }
 
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeMetaKeeper.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeMetaKeeper.java
index 613655c1d01..85c58ecc873 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeMetaKeeper.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeMetaKeeper.java
@@ -27,6 +27,7 @@ import java.io.IOException;
 import java.util.Map;
 import java.util.Objects;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 public class PipeMetaKeeper {
@@ -46,6 +47,10 @@ public class PipeMetaKeeper {
     pipeMetaKeeperLock.readLock().lock();
   }
 
+  public boolean tryReadLock(long timeOut) throws InterruptedException {
+    return pipeMetaKeeperLock.readLock().tryLock(timeOut, TimeUnit.SECONDS);
+  }
+
   public void releaseReadLock() {
     pipeMetaKeeperLock.readLock().unlock();
   }

Reply via email to