This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 55232d021e9 fix tsfile checker leak (#15710)
55232d021e9 is described below
commit 55232d021e9582ed0a7e488683512addfe6f7806
Author: Peng Junzhi <[email protected]>
AuthorDate: Tue Jun 17 18:07:29 2025 +0800
fix tsfile checker leak (#15710)
---
.../pipeconsensus/PipeConsensusReceiverAgent.java | 138 +++++++++++----------
1 file changed, 75 insertions(+), 63 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiverAgent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiverAgent.java
index 21bd35868ff..4b899323fb5 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiverAgent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiverAgent.java
@@ -47,6 +47,7 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
public class PipeConsensusReceiverAgent implements ConsensusPipeReceiver {
@@ -71,7 +72,8 @@ public class PipeConsensusReceiverAgent implements
ConsensusPipeReceiver {
ConsensusGroupId, Map<ConsensusPipeName,
AtomicReference<PipeConsensusReceiver>>>
replicaReceiverMap = new ConcurrentHashMap<>();
- private final Set<ConsensusPipeName> aliveReceivers = new
CopyOnWriteArraySet<>();
+ private final ReentrantReadWriteLock receiverLifeCircleLock = new
ReentrantReadWriteLock();
+ private final Set<Integer> staleRegions = new CopyOnWriteArraySet<>();
private PipeConsensus pipeConsensus;
@@ -131,43 +133,49 @@ public class PipeConsensusReceiverAgent implements
ConsensusPipeReceiver {
private PipeConsensusReceiver getReceiver(
ConsensusGroupId consensusGroupId, int leaderDataNodeId, byte
reqVersion) {
- // 1. Route to given consensusGroup's receiver map
- Map<ConsensusPipeName, AtomicReference<PipeConsensusReceiver>>
consensusPipe2ReceiverMap =
- replicaReceiverMap.computeIfAbsent(consensusGroupId, key -> new
ConcurrentHashMap<>());
- // 2. Route to given consensusPipeTask's receiver
- ConsensusPipeName consensusPipeName =
- new ConsensusPipeName(consensusGroupId, leaderDataNodeId, thisNodeId);
- AtomicBoolean isFirstGetReceiver = new AtomicBoolean(false);
- AtomicReference<PipeConsensusReceiver> receiverReference =
- consensusPipe2ReceiverMap.computeIfAbsent(
- consensusPipeName,
- key -> {
- isFirstGetReceiver.set(true);
- return new AtomicReference<>(null);
- });
-
- // 3. If not first get receiver && receiver is not alive, return null.
- if (!isFirstGetReceiver.get() &&
!aliveReceivers.contains(consensusPipeName)) {
- return null;
- }
+ // Try to not block concurrent execution of receive() while ensuring
sequential execution of
+ // creating receiver and releasing receiver by using writeReadLock.
+ receiverLifeCircleLock.readLock().lock();
+ try {
+ // If data region is stale, return null.
+ if (staleRegions.contains(consensusGroupId.getId())) {
+ return null;
+ }
+ // 1. Route to given consensusGroup's receiver map
+ Map<ConsensusPipeName, AtomicReference<PipeConsensusReceiver>>
consensusPipe2ReceiverMap =
+ replicaReceiverMap.computeIfAbsent(consensusGroupId, key -> new
ConcurrentHashMap<>());
+ // 2. Route to given consensusPipeTask's receiver
+ ConsensusPipeName consensusPipeName =
+ new ConsensusPipeName(consensusGroupId, leaderDataNodeId,
thisNodeId);
+ AtomicBoolean isFirstGetReceiver = new AtomicBoolean(false);
+ AtomicReference<PipeConsensusReceiver> receiverReference =
+ consensusPipe2ReceiverMap.computeIfAbsent(
+ consensusPipeName,
+ key -> {
+ isFirstGetReceiver.set(true);
+ return new AtomicReference<>(null);
+ });
- if (receiverReference.get() == null) {
- return internalSetAndGetReceiver(
- consensusGroupId, consensusPipeName, reqVersion, isFirstGetReceiver);
- }
+ if (receiverReference.get() == null) {
+ return internalSetAndGetReceiver(
+ consensusGroupId, consensusPipeName, reqVersion,
isFirstGetReceiver);
+ }
- final byte receiverThreadLocalVersion =
receiverReference.get().getVersion().getVersion();
- if (receiverThreadLocalVersion != reqVersion) {
- LOGGER.warn(
- "The pipeConsensus request version {} is different from the sender
request version {},"
- + " the receiver will be reset to the sender request version.",
- receiverThreadLocalVersion,
- reqVersion);
- receiverReference.set(null);
- return internalSetAndGetReceiver(
- consensusGroupId, consensusPipeName, reqVersion, isFirstGetReceiver);
+ final byte receiverThreadLocalVersion =
receiverReference.get().getVersion().getVersion();
+ if (receiverThreadLocalVersion != reqVersion) {
+ LOGGER.warn(
+ "The pipeConsensus request version {} is different from the sender
request version {},"
+ + " the receiver will be reset to the sender request version.",
+ receiverThreadLocalVersion,
+ reqVersion);
+ receiverReference.set(null);
+ return internalSetAndGetReceiver(
+ consensusGroupId, consensusPipeName, reqVersion,
isFirstGetReceiver);
+ }
+ return receiverReference.get();
+ } finally {
+ receiverLifeCircleLock.readLock().unlock();
}
- return receiverReference.get();
}
private PipeConsensusReceiver internalSetAndGetReceiver(
@@ -184,8 +192,6 @@ public class PipeConsensusReceiverAgent implements
ConsensusPipeReceiver {
if (isFirstGetReceiver.get()) {
if (RECEIVER_CONSTRUCTORS.containsKey(reqVersion)) {
- // If is first get receiver, set this receiver as alive.
- aliveReceivers.add(consensusPipeName);
receiverReference.set(
RECEIVER_CONSTRUCTORS
.get(reqVersion)
@@ -215,34 +221,40 @@ public class PipeConsensusReceiverAgent implements
ConsensusPipeReceiver {
}
}
- /** Release all receivers of given data region */
+ /** Release all receivers of given data region when this region is deleted */
@Override
public final void releaseReceiverResource(DataRegionId dataRegionId) {
- // 1. Route to given consensusGroup's receiver map
- Map<ConsensusPipeName, AtomicReference<PipeConsensusReceiver>>
consensusPipe2ReciverMap =
- this.replicaReceiverMap.getOrDefault(
- ConsensusGroupId.Factory.create(
- TConsensusGroupType.DataRegion.getValue(),
dataRegionId.getId()),
- new ConcurrentHashMap<>());
- // 2. Release all related receivers
- consensusPipe2ReciverMap.entrySet().stream()
- .filter(entry -> entry.getKey().getReceiverDataNodeId() == thisNodeId)
- .forEach(
- receiverEntry -> {
- ConsensusPipeName consensusPipeName = receiverEntry.getKey();
- AtomicReference<PipeConsensusReceiver> receiverReference =
receiverEntry.getValue();
- if (receiverReference != null) {
- // no longer receive new request
- aliveReceivers.remove(consensusPipeName);
- receiverReference.get().handleExit();
- receiverReference.set(null);
- }
- });
- // 3. Release replica map
- this.replicaReceiverMap.remove(dataRegionId);
- // 4. GC receiver map
- consensusPipe2ReciverMap.clear();
- LOGGER.info("All Receivers related to {} are released.", dataRegionId);
+ receiverLifeCircleLock.writeLock().lock();
+ try {
+ // Mark this region as stale first, indicating that this region can not
create new receivers
+ // since it has been deleted.
+ staleRegions.add(dataRegionId.getId());
+ // 1. Route to given consensusGroup's receiver map
+ Map<ConsensusPipeName, AtomicReference<PipeConsensusReceiver>>
consensusPipe2ReciverMap =
+ this.replicaReceiverMap.getOrDefault(
+ ConsensusGroupId.Factory.create(
+ TConsensusGroupType.DataRegion.getValue(),
dataRegionId.getId()),
+ new ConcurrentHashMap<>());
+ // 2. Release all related receivers
+ consensusPipe2ReciverMap.entrySet().stream()
+ .filter(entry -> entry.getKey().getReceiverDataNodeId() ==
thisNodeId)
+ .forEach(
+ receiverEntry -> {
+ ConsensusPipeName consensusPipeName = receiverEntry.getKey();
+ AtomicReference<PipeConsensusReceiver> receiverReference =
receiverEntry.getValue();
+ if (receiverReference != null) {
+ receiverReference.get().handleExit();
+ receiverReference.set(null);
+ }
+ });
+ // 3. Release replica map
+ this.replicaReceiverMap.remove(dataRegionId);
+ // 4. GC receiver map
+ consensusPipe2ReciverMap.clear();
+ LOGGER.info("All Receivers related to {} are released.", dataRegionId);
+ } finally {
+ receiverLifeCircleLock.writeLock().unlock();
+ }
}
public final void closeReceiverExecutor() {