This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 55232d021e9 fix tsfile checker leak (#15710)
55232d021e9 is described below

commit 55232d021e9582ed0a7e488683512addfe6f7806
Author: Peng Junzhi <[email protected]>
AuthorDate: Tue Jun 17 18:07:29 2025 +0800

    fix tsfile checker leak (#15710)
---
 .../pipeconsensus/PipeConsensusReceiverAgent.java  | 138 +++++++++++----------
 1 file changed, 75 insertions(+), 63 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiverAgent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiverAgent.java
index 21bd35868ff..4b899323fb5 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiverAgent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiverAgent.java
@@ -47,6 +47,7 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CopyOnWriteArraySet;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 public class PipeConsensusReceiverAgent implements ConsensusPipeReceiver {
 
@@ -71,7 +72,8 @@ public class PipeConsensusReceiverAgent implements 
ConsensusPipeReceiver {
           ConsensusGroupId, Map<ConsensusPipeName, 
AtomicReference<PipeConsensusReceiver>>>
       replicaReceiverMap = new ConcurrentHashMap<>();
 
-  private final Set<ConsensusPipeName> aliveReceivers = new 
CopyOnWriteArraySet<>();
+  private final ReentrantReadWriteLock receiverLifeCircleLock = new 
ReentrantReadWriteLock();
+  private final Set<Integer> staleRegions = new CopyOnWriteArraySet<>();
 
   private PipeConsensus pipeConsensus;
 
@@ -131,43 +133,49 @@ public class PipeConsensusReceiverAgent implements 
ConsensusPipeReceiver {
 
   private PipeConsensusReceiver getReceiver(
       ConsensusGroupId consensusGroupId, int leaderDataNodeId, byte 
reqVersion) {
-    // 1. Route to given consensusGroup's receiver map
-    Map<ConsensusPipeName, AtomicReference<PipeConsensusReceiver>> 
consensusPipe2ReceiverMap =
-        replicaReceiverMap.computeIfAbsent(consensusGroupId, key -> new 
ConcurrentHashMap<>());
-    // 2. Route to given consensusPipeTask's receiver
-    ConsensusPipeName consensusPipeName =
-        new ConsensusPipeName(consensusGroupId, leaderDataNodeId, thisNodeId);
-    AtomicBoolean isFirstGetReceiver = new AtomicBoolean(false);
-    AtomicReference<PipeConsensusReceiver> receiverReference =
-        consensusPipe2ReceiverMap.computeIfAbsent(
-            consensusPipeName,
-            key -> {
-              isFirstGetReceiver.set(true);
-              return new AtomicReference<>(null);
-            });
-
-    // 3. If not first get receiver && receiver is not alive, return null.
-    if (!isFirstGetReceiver.get() && 
!aliveReceivers.contains(consensusPipeName)) {
-      return null;
-    }
+    // Try to not block concurrent execution of receive() while ensuring 
sequential execution of
+    // creating receiver and releasing receiver by using writeReadLock.
+    receiverLifeCircleLock.readLock().lock();
+    try {
+      // If data region is stale, return null.
+      if (staleRegions.contains(consensusGroupId.getId())) {
+        return null;
+      }
+      // 1. Route to given consensusGroup's receiver map
+      Map<ConsensusPipeName, AtomicReference<PipeConsensusReceiver>> 
consensusPipe2ReceiverMap =
+          replicaReceiverMap.computeIfAbsent(consensusGroupId, key -> new 
ConcurrentHashMap<>());
+      // 2. Route to given consensusPipeTask's receiver
+      ConsensusPipeName consensusPipeName =
+          new ConsensusPipeName(consensusGroupId, leaderDataNodeId, 
thisNodeId);
+      AtomicBoolean isFirstGetReceiver = new AtomicBoolean(false);
+      AtomicReference<PipeConsensusReceiver> receiverReference =
+          consensusPipe2ReceiverMap.computeIfAbsent(
+              consensusPipeName,
+              key -> {
+                isFirstGetReceiver.set(true);
+                return new AtomicReference<>(null);
+              });
 
-    if (receiverReference.get() == null) {
-      return internalSetAndGetReceiver(
-          consensusGroupId, consensusPipeName, reqVersion, isFirstGetReceiver);
-    }
+      if (receiverReference.get() == null) {
+        return internalSetAndGetReceiver(
+            consensusGroupId, consensusPipeName, reqVersion, 
isFirstGetReceiver);
+      }
 
-    final byte receiverThreadLocalVersion = 
receiverReference.get().getVersion().getVersion();
-    if (receiverThreadLocalVersion != reqVersion) {
-      LOGGER.warn(
-          "The pipeConsensus request version {} is different from the sender 
request version {},"
-              + " the receiver will be reset to the sender request version.",
-          receiverThreadLocalVersion,
-          reqVersion);
-      receiverReference.set(null);
-      return internalSetAndGetReceiver(
-          consensusGroupId, consensusPipeName, reqVersion, isFirstGetReceiver);
+      final byte receiverThreadLocalVersion = 
receiverReference.get().getVersion().getVersion();
+      if (receiverThreadLocalVersion != reqVersion) {
+        LOGGER.warn(
+            "The pipeConsensus request version {} is different from the sender 
request version {},"
+                + " the receiver will be reset to the sender request version.",
+            receiverThreadLocalVersion,
+            reqVersion);
+        receiverReference.set(null);
+        return internalSetAndGetReceiver(
+            consensusGroupId, consensusPipeName, reqVersion, 
isFirstGetReceiver);
+      }
+      return receiverReference.get();
+    } finally {
+      receiverLifeCircleLock.readLock().unlock();
     }
-    return receiverReference.get();
   }
 
   private PipeConsensusReceiver internalSetAndGetReceiver(
@@ -184,8 +192,6 @@ public class PipeConsensusReceiverAgent implements 
ConsensusPipeReceiver {
 
     if (isFirstGetReceiver.get()) {
       if (RECEIVER_CONSTRUCTORS.containsKey(reqVersion)) {
-        // If is first get receiver, set this receiver as alive.
-        aliveReceivers.add(consensusPipeName);
         receiverReference.set(
             RECEIVER_CONSTRUCTORS
                 .get(reqVersion)
@@ -215,34 +221,40 @@ public class PipeConsensusReceiverAgent implements 
ConsensusPipeReceiver {
     }
   }
 
-  /** Release all receivers of given data region */
+  /** Release all receivers of given data region when this region is deleted */
   @Override
   public final void releaseReceiverResource(DataRegionId dataRegionId) {
-    // 1. Route to given consensusGroup's receiver map
-    Map<ConsensusPipeName, AtomicReference<PipeConsensusReceiver>> 
consensusPipe2ReciverMap =
-        this.replicaReceiverMap.getOrDefault(
-            ConsensusGroupId.Factory.create(
-                TConsensusGroupType.DataRegion.getValue(), 
dataRegionId.getId()),
-            new ConcurrentHashMap<>());
-    // 2. Release all related receivers
-    consensusPipe2ReciverMap.entrySet().stream()
-        .filter(entry -> entry.getKey().getReceiverDataNodeId() == thisNodeId)
-        .forEach(
-            receiverEntry -> {
-              ConsensusPipeName consensusPipeName = receiverEntry.getKey();
-              AtomicReference<PipeConsensusReceiver> receiverReference = 
receiverEntry.getValue();
-              if (receiverReference != null) {
-                // no longer receive new request
-                aliveReceivers.remove(consensusPipeName);
-                receiverReference.get().handleExit();
-                receiverReference.set(null);
-              }
-            });
-    // 3. Release replica map
-    this.replicaReceiverMap.remove(dataRegionId);
-    // 4. GC receiver map
-    consensusPipe2ReciverMap.clear();
-    LOGGER.info("All Receivers related to {} are released.", dataRegionId);
+    receiverLifeCircleLock.writeLock().lock();
+    try {
+      // Mark this region as stale first, indicating that this region can not 
create new receivers
+      // since it has been deleted.
+      staleRegions.add(dataRegionId.getId());
+      // 1. Route to given consensusGroup's receiver map
+      Map<ConsensusPipeName, AtomicReference<PipeConsensusReceiver>> 
consensusPipe2ReciverMap =
+          this.replicaReceiverMap.getOrDefault(
+              ConsensusGroupId.Factory.create(
+                  TConsensusGroupType.DataRegion.getValue(), 
dataRegionId.getId()),
+              new ConcurrentHashMap<>());
+      // 2. Release all related receivers
+      consensusPipe2ReciverMap.entrySet().stream()
+          .filter(entry -> entry.getKey().getReceiverDataNodeId() == 
thisNodeId)
+          .forEach(
+              receiverEntry -> {
+                ConsensusPipeName consensusPipeName = receiverEntry.getKey();
+                AtomicReference<PipeConsensusReceiver> receiverReference = 
receiverEntry.getValue();
+                if (receiverReference != null) {
+                  receiverReference.get().handleExit();
+                  receiverReference.set(null);
+                }
+              });
+      // 3. Release replica map
+      this.replicaReceiverMap.remove(dataRegionId);
+      // 4. GC receiver map
+      consensusPipe2ReciverMap.clear();
+      LOGGER.info("All Receivers related to {} are released.", dataRegionId);
+    } finally {
+      receiverLifeCircleLock.writeLock().unlock();
+    }
   }
 
   public final void closeReceiverExecutor() {

Reply via email to