This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch rel/1.2
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/rel/1.2 by this push:
new de9db9122a6 [To rel/1.2][IOTDB-6089] Improved the lock behaviour of
the pipe heartbeat to avoid causing DataNode unknown (#10714) (#10718)
de9db9122a6 is described below
commit de9db9122a6c3611ebe9e6dac5ba8f1757cfb0f5
Author: Caideyipi <[email protected]>
AuthorDate: Fri Jul 28 21:54:34 2023 +0800
[To rel/1.2][IOTDB-6089] Improved the lock behaviour of the pipe heartbeat
to avoid causing DataNode unknown (#10714) (#10718)
---
.../iotdb/db/pipe/agent/task/PipeTaskAgent.java | 28 ++++++++++++++++++----
.../commons/pipe/task/meta/PipeMetaKeeper.java | 5 ++++
2 files changed, 28 insertions(+), 5 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskAgent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskAgent.java
index 2ebe4156b34..da21b757444 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskAgent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskAgent.java
@@ -93,6 +93,16 @@ public class PipeTaskAgent {
pipeMetaKeeper.acquireReadLock();
}
+ public boolean tryReadLockWithTimeOut(long timeOutInSeconds) {
+ try {
+ return pipeMetaKeeper.tryReadLock(timeOutInSeconds);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ LOGGER.warn("Interruption during requiring pipeMetaKeeper lock.", e);
+ return false;
+ }
+ }
+
private void releaseReadLock() {
pipeMetaKeeper.releaseReadLock();
}
@@ -705,18 +715,26 @@ public class PipeTaskAgent {
public synchronized void collectPipeMetaList(THeartbeatReq req,
THeartbeatResp resp)
throws TException {
- acquireReadLock();
+ // If the pipe heartbeat is separated from the cluster heartbeat, then the
lock doesn't
+ // need to be acquired
+ if (!req.isNeedPipeMetaList()) {
+ return;
+ }
+ // Try the lock instead of directly acquire it to prevent the block of the
cluster heartbeat
+ // 10s is the half of the HEARTBEAT_TIMEOUT_TIME defined in class
BaseNodeCache in ConfigNode
+ if (!tryReadLockWithTimeOut(10)) {
+ return;
+ }
try {
- collectPipeMetaListInternal(req, resp);
+ collectPipeMetaListInternal(resp);
} finally {
releaseReadLock();
}
}
- private void collectPipeMetaListInternal(THeartbeatReq req, THeartbeatResp
resp)
- throws TException {
+ private void collectPipeMetaListInternal(THeartbeatResp resp) throws
TException {
// Do nothing if data node is removing or removed, or request does not
need pipe meta list
- if (PipeAgent.runtime().isShutdown() || !req.isNeedPipeMetaList()) {
+ if (PipeAgent.runtime().isShutdown()) {
return;
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeMetaKeeper.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeMetaKeeper.java
index 613655c1d01..85c58ecc873 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeMetaKeeper.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeMetaKeeper.java
@@ -27,6 +27,7 @@ import java.io.IOException;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantReadWriteLock;
public class PipeMetaKeeper {
@@ -46,6 +47,10 @@ public class PipeMetaKeeper {
pipeMetaKeeperLock.readLock().lock();
}
+ public boolean tryReadLock(long timeOut) throws InterruptedException {
+ return pipeMetaKeeperLock.readLock().tryLock(timeOut, TimeUnit.SECONDS);
+ }
+
public void releaseReadLock() {
pipeMetaKeeperLock.readLock().unlock();
}