This is an automated email from the ASF dual-hosted git repository.

tanxinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 7a4c1273151 IoTV2: Fix closing receiver corner case regarding to 
concurrently closing and receiving file. (#15271)
7a4c1273151 is described below

commit 7a4c12731515cfdc17314d135d32466bf6cc7984
Author: Peng Junzhi <[email protected]>
AuthorDate: Mon Apr 7 18:53:10 2025 +0800

    IoTV2: Fix closing receiver corner case regarding to concurrently closing 
and receiving file. (#15271)
    
    * fix closing problem
    
    * fix comment
---
 .../java/org/apache/iotdb/rpc/TSStatusCode.java    |  1 +
 .../pipeconsensus/PipeConsensusReceiver.java       | 26 ++++++++++++++++++++++
 2 files changed, 27 insertions(+)

diff --git 
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java 
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
index 58c9de7502c..6fb9cdcf7a3 100644
--- 
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
+++ 
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
@@ -312,6 +312,7 @@ public enum TSStatusCode {
   PIPE_CONSENSUS_TYPE_ERROR(2205),
   CONSENSUS_GROUP_NOT_EXIST(2206),
   RATIS_READ_UNAVAILABLE(2207),
+  PIPE_CONSENSUS_CLOSE_ERROR(2208),
   ;
 
   private final int statusCode;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiver.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiver.java
index 6d1a86eba34..10572e0a881 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiver.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiver.java
@@ -82,6 +82,7 @@ import java.util.Objects;
 import java.util.Optional;
 import java.util.TreeSet;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.Lock;
@@ -106,6 +107,7 @@ public class PipeConsensusReceiver {
   private final List<String> receiveDirs = new ArrayList<>();
   private final PipeConsensusReceiverMetrics pipeConsensusReceiverMetrics;
   private final FolderManager folderManager;
+  private final AtomicBoolean isClosed = new AtomicBoolean(false);
 
   public PipeConsensusReceiver(
       PipeConsensus pipeConsensus,
@@ -1043,6 +1045,8 @@ public class PipeConsensusReceiver {
   }
 
   public synchronized void handleExit() {
+    // only after closing request executor, can we clean receiver.
+    requestExecutor.tryClose();
     // Clear the tsFileWriters and receiver base dirs
     pipeConsensusTsFileWriterPool.handleExit(consensusPipeName);
     clearAllReceiverBaseDir();
@@ -1363,6 +1367,16 @@ public class PipeConsensusReceiver {
       }
     }
 
+    private void tryClose() {
+      // It will not be closed until all requests sent before closing are done.
+      lock.lock();
+      try {
+        isClosed.set(true);
+      } finally {
+        lock.unlock();
+      }
+    }
+
     private TPipeConsensusTransferResp onRequest(
         final TPipeConsensusTransferReq req,
         final boolean isTransferTsFilePiece,
@@ -1370,6 +1384,18 @@ public class PipeConsensusReceiver {
       long startAcquireLockNanos = System.nanoTime();
       lock.lock();
       try {
+        if (isClosed.get()) {
+          final TSStatus status =
+              new TSStatus(
+                  RpcUtils.getStatus(
+                      TSStatusCode.PIPE_CONSENSUS_CLOSE_ERROR,
+                      "PipeConsensus receiver received a request after it was 
closed."));
+          LOGGER.info(
+              "PipeConsensus-PipeName-{}: received a request after receiver 
was closed and pipe task was dropped.",
+              consensusPipeName);
+          return new TPipeConsensusTransferResp(status);
+        }
+
         long startDispatchNanos = System.nanoTime();
         metric.recordAcquireExecutorLockTimer(startDispatchNanos - 
startAcquireLockNanos);
 

Reply via email to