This is an automated email from the ASF dual-hosted git repository.

tanxinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new b4852c91c52 IoTV2: Try to fix tsfile corruption & receiver dir clean 
(#15410)
b4852c91c52 is described below

commit b4852c91c52ddad20815d5fb3ce027768b1b0d9b
Author: Peng Junzhi <[email protected]>
AuthorDate: Mon Apr 28 15:11:25 2025 +0800

    IoTV2: Try to fix tsfile corruption & receiver dir clean (#15410)
    
    * Try to fix tsfile corruption & receiver dir clean
    
    * improve
    
    * typo
---
 .../pipe/consensuspipe/ConsensusPipeReceiver.java  |   3 +-
 .../db/pipe/agent/task/PipeDataNodeTaskAgent.java  |  26 --
 .../pipeconsensus/PipeConsensusReceiver.java       | 327 ++++++++++-----------
 .../pipeconsensus/PipeConsensusReceiverAgent.java  |  61 ++--
 .../iotdb/db/storageengine/StorageEngine.java      |   1 +
 5 files changed, 198 insertions(+), 220 deletions(-)

diff --git 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/consensuspipe/ConsensusPipeReceiver.java
 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/consensuspipe/ConsensusPipeReceiver.java
index ba0c89fbc6f..d5be57682ca 100644
--- 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/consensuspipe/ConsensusPipeReceiver.java
+++ 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/consensuspipe/ConsensusPipeReceiver.java
@@ -19,11 +19,12 @@
 
 package org.apache.iotdb.consensus.pipe.consensuspipe;
 
+import org.apache.iotdb.commons.consensus.DataRegionId;
 import org.apache.iotdb.consensus.pipe.thrift.TPipeConsensusTransferReq;
 import org.apache.iotdb.consensus.pipe.thrift.TPipeConsensusTransferResp;
 
 public interface ConsensusPipeReceiver {
   TPipeConsensusTransferResp receive(TPipeConsensusTransferReq req);
 
-  void handleDropPipeConsensusTask(ConsensusPipeName pipeName);
+  void releaseReceiverResource(DataRegionId regionId);
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
index 96fdbe36209..df196eabaf8 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
@@ -331,28 +331,9 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
     PipeTsFileToTabletsMetrics.getInstance().deregister(taskId);
     PipeDataNodeRemainingEventAndTimeMetrics.getInstance().deregister(taskId);
 
-    if (pipeName.startsWith(PipeStaticMeta.CONSENSUS_PIPE_PREFIX)) {
-      // Release corresponding receiver's resource
-      PipeDataNodeAgent.receiver()
-          .pipeConsensus()
-          .handleDropPipeConsensusTask(new ConsensusPipeName(pipeName));
-    }
-
     return true;
   }
 
-  @Override
-  protected boolean createPipe(final PipeMeta pipeMetaFromCoordinator) throws 
IllegalPathException {
-    String pipeName = pipeMetaFromCoordinator.getStaticMeta().getPipeName();
-    if (pipeName.startsWith(PipeStaticMeta.CONSENSUS_PIPE_PREFIX)) {
-      // Release corresponding receiver's resource
-      PipeDataNodeAgent.receiver()
-          .pipeConsensus()
-          .markConsensusPipeAsCreated(new ConsensusPipeName(pipeName));
-    }
-    return super.createPipe(pipeMetaFromCoordinator);
-  }
-
   @Override
   protected boolean dropPipe(final String pipeName) {
     // Get the pipe meta first because it is removed after 
super#dropPipe(pipeName)
@@ -369,13 +350,6 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
       
PipeDataNodeRemainingEventAndTimeMetrics.getInstance().deregister(taskId);
     }
 
-    if (pipeName.startsWith(PipeStaticMeta.CONSENSUS_PIPE_PREFIX)) {
-      // Release corresponding receiver's resource
-      PipeDataNodeAgent.receiver()
-          .pipeConsensus()
-          .handleDropPipeConsensusTask(new ConsensusPipeName(pipeName));
-    }
-
     return true;
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiver.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiver.java
index 5f353a61445..d01d7315b95 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiver.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiver.java
@@ -91,7 +91,9 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.stream.Collectors;
 
 public class PipeConsensusReceiver {
@@ -117,6 +119,7 @@ public class PipeConsensusReceiver {
   private final PipeConsensusReceiverMetrics pipeConsensusReceiverMetrics;
   private final FolderManager folderManager;
   private final AtomicBoolean isClosed = new AtomicBoolean(false);
+  private final ReadWriteLock tsFilePieceReadWriteLock = new 
ReentrantReadWriteLock(true);
 
   public PipeConsensusReceiver(
       PipeConsensus pipeConsensus,
@@ -241,6 +244,9 @@ public class PipeConsensusReceiver {
   }
 
   private TPipeConsensusTransferResp loadEvent(final TPipeConsensusTransferReq 
req) {
+    if (isClosed.get()) {
+      return 
PipeConsensusReceiverAgent.closedResp(consensusPipeName.toString(), 
req.getCommitId());
+    }
     // synchronized load event, ensured by upper caller's lock.
     try {
       final short rawRequestType = req.getType();
@@ -329,91 +335,97 @@ public class PipeConsensusReceiver {
 
   private TPipeConsensusTransferResp handleTransferFilePiece(
       final PipeConsensusTransferFilePieceReq req, final boolean isSingleFile) 
{
-    if (LOGGER.isDebugEnabled()) {
-      LOGGER.debug(
-          "PipeConsensus-PipeName-{}: starting to receive tsFile pieces", 
consensusPipeName);
-    }
-    long startBorrowTsFileWriterNanos = System.nanoTime();
-    PipeConsensusTsFileWriter tsFileWriter =
-        
pipeConsensusTsFileWriterPool.borrowCorrespondingWriter(req.getCommitId());
-    long startPreCheckNanos = System.nanoTime();
-    pipeConsensusReceiverMetrics.recordBorrowTsFileWriterTimer(
-        startPreCheckNanos - startBorrowTsFileWriterNanos);
-
+    tsFilePieceReadWriteLock.readLock().lock();
     try {
-      updateWritingFileIfNeeded(tsFileWriter, req.getFileName(), isSingleFile);
-      final File writingFile = tsFileWriter.getWritingFile();
-      final RandomAccessFile writingFileWriter = 
tsFileWriter.getWritingFileWriter();
-
-      if (isWritingFileOffsetNonCorrect(tsFileWriter, 
req.getStartWritingOffset())) {
-        if (!writingFile.getName().endsWith(TsFileConstant.TSFILE_SUFFIX)) {
-          // If the file is a tsFile, then the content will not be changed for 
a specific
-          // filename. However, for other files (mod, snapshot, etc.) the 
content varies for the
-          // same name in different times, then we must rewrite the file to 
apply the newest
-          // version.
-          writingFileWriter.setLength(0);
-        }
-
-        final TSStatus status =
-            RpcUtils.getStatus(
-                TSStatusCode.PIPE_CONSENSUS_TRANSFER_FILE_OFFSET_RESET,
-                String.format(
-                    "Request sender to reset file reader's offset from %s to 
%s.",
-                    req.getStartWritingOffset(), writingFileWriter.length()));
-        LOGGER.warn(
-            "PipeConsensus-PipeName-{}: File offset reset requested by 
receiver, response status = {}.",
-            consensusPipeName,
-            status);
-        return PipeConsensusTransferFilePieceResp.toTPipeConsensusTransferResp(
-            status, writingFileWriter.length());
+      if (LOGGER.isDebugEnabled()) {
+        LOGGER.debug(
+            "PipeConsensus-PipeName-{}: starting to receive tsFile pieces", 
consensusPipeName);
       }
+      long startBorrowTsFileWriterNanos = System.nanoTime();
+      PipeConsensusTsFileWriter tsFileWriter =
+          
pipeConsensusTsFileWriterPool.borrowCorrespondingWriter(req.getCommitId());
+      long startPreCheckNanos = System.nanoTime();
+      pipeConsensusReceiverMetrics.recordBorrowTsFileWriterTimer(
+          startPreCheckNanos - startBorrowTsFileWriterNanos);
 
-      long endPreCheckNanos = System.nanoTime();
-      pipeConsensusReceiverMetrics.recordTsFilePiecePreCheckTime(
-          endPreCheckNanos - startPreCheckNanos);
-      writingFileWriter.write(req.getFilePiece());
-      
pipeConsensusReceiverMetrics.recordTsFilePieceWriteTime(System.nanoTime() - 
endPreCheckNanos);
-      return PipeConsensusTransferFilePieceResp.toTPipeConsensusTransferResp(
-          RpcUtils.SUCCESS_STATUS, writingFileWriter.length());
-    } catch (Exception e) {
-      LOGGER.warn(
-          "PipeConsensus-PipeName-{}: Failed to write file piece from req {}.",
-          consensusPipeName,
-          req,
-          e);
-      final TSStatus status =
-          RpcUtils.getStatus(
-              TSStatusCode.PIPE_CONSENSUS_TRANSFER_FILE_ERROR,
-              String.format("Failed to write file piece, because %s", 
e.getMessage()));
       try {
-        return PipeConsensusTransferFilePieceResp.toTPipeConsensusTransferResp(
-            status, PipeTransferFilePieceResp.ERROR_END_OFFSET);
-      } catch (IOException ex) {
-        return 
PipeConsensusTransferFilePieceResp.toTPipeConsensusTransferResp(status);
-      } finally {
-        // Exception may occur when disk system go wrong. At this time, we may 
reset all resource
-        // and receive this file from scratch when leader will try to resend 
this file from scratch
-        // as well.
-        closeCurrentWritingFileWriter(tsFileWriter, false);
-        deleteCurrentWritingFile(tsFileWriter);
-        // must return tsfileWriter after deleting its file.
-        try {
-          tsFileWriter.returnSelf(consensusPipeName);
-        } catch (IOException | DiskSpaceInsufficientException returnException) 
{
+        updateWritingFileIfNeeded(tsFileWriter, req.getFileName(), 
isSingleFile);
+        final File writingFile = tsFileWriter.getWritingFile();
+        final RandomAccessFile writingFileWriter = 
tsFileWriter.getWritingFileWriter();
+
+        if (isWritingFileOffsetNonCorrect(tsFileWriter, 
req.getStartWritingOffset())) {
+          if (!writingFile.getName().endsWith(TsFileConstant.TSFILE_SUFFIX)) {
+            // If the file is a tsFile, then the content will not be changed 
for a specific
+            // filename. However, for other files (mod, snapshot, etc.) the 
content varies for the
+            // same name in different times, then we must rewrite the file to 
apply the newest
+            // version.
+            writingFileWriter.setLength(0);
+          }
+
+          final TSStatus status =
+              RpcUtils.getStatus(
+                  TSStatusCode.PIPE_CONSENSUS_TRANSFER_FILE_OFFSET_RESET,
+                  String.format(
+                      "Request sender to reset file reader's offset from %s to 
%s.",
+                      req.getStartWritingOffset(), 
writingFileWriter.length()));
           LOGGER.warn(
-              "PipeConsensus-PipeName-{}: Failed to return tsFileWriter {}.",
+              "PipeConsensus-PipeName-{}: File offset reset requested by 
receiver, response status = {}.",
               consensusPipeName,
-              tsFileWriter,
-              returnException);
+              status);
+          return 
PipeConsensusTransferFilePieceResp.toTPipeConsensusTransferResp(
+              status, writingFileWriter.length());
+        }
+
+        long endPreCheckNanos = System.nanoTime();
+        pipeConsensusReceiverMetrics.recordTsFilePiecePreCheckTime(
+            endPreCheckNanos - startPreCheckNanos);
+        writingFileWriter.write(req.getFilePiece());
+        pipeConsensusReceiverMetrics.recordTsFilePieceWriteTime(
+            System.nanoTime() - endPreCheckNanos);
+        return PipeConsensusTransferFilePieceResp.toTPipeConsensusTransferResp(
+            RpcUtils.SUCCESS_STATUS, writingFileWriter.length());
+      } catch (Exception e) {
+        LOGGER.warn(
+            "PipeConsensus-PipeName-{}: Failed to write file piece from req 
{}.",
+            consensusPipeName,
+            req,
+            e);
+        final TSStatus status =
+            RpcUtils.getStatus(
+                TSStatusCode.PIPE_CONSENSUS_TRANSFER_FILE_ERROR,
+                String.format("Failed to write file piece, because %s", 
e.getMessage()));
+        try {
+          return 
PipeConsensusTransferFilePieceResp.toTPipeConsensusTransferResp(
+              status, PipeTransferFilePieceResp.ERROR_END_OFFSET);
+        } catch (IOException ex) {
+          return 
PipeConsensusTransferFilePieceResp.toTPipeConsensusTransferResp(status);
+        } finally {
+          // Exception may occur when disk system go wrong. At this time, we 
may reset all resource
+          // and receive this file from scratch when leader will try to resend 
this file from
+          // scratch
+          // as well.
+          closeCurrentWritingFileWriter(tsFileWriter, false);
+          deleteCurrentWritingFile(tsFileWriter);
+          // must return tsfileWriter after deleting its file.
+          try {
+            tsFileWriter.returnSelf(consensusPipeName);
+          } catch (IOException | DiskSpaceInsufficientException 
returnException) {
+            LOGGER.warn(
+                "PipeConsensus-PipeName-{}: Failed to return tsFileWriter {}.",
+                consensusPipeName,
+                tsFileWriter,
+                returnException);
+          }
         }
       }
+    } finally {
+      tsFilePieceReadWriteLock.readLock().unlock();
     }
   }
 
   private TPipeConsensusTransferResp handleTransferFileSeal(final 
PipeConsensusTsFileSealReq req) {
-    if (LOGGER.isDebugEnabled()) {
-      LOGGER.debug("PipeConsensus-PipeName-{}: starting to receive tsFile 
seal", consensusPipeName);
-    }
+    // TODO: turn it to debug after GA
+    LOGGER.info("PipeConsensus-PipeName-{}: starting to receive tsFile seal", 
consensusPipeName);
     long startBorrowTsFileWriterNanos = System.nanoTime();
     PipeConsensusTsFileWriter tsFileWriter =
         
pipeConsensusTsFileWriterPool.borrowCorrespondingWriter(req.getCommitId());
@@ -531,11 +543,9 @@ public class PipeConsensusReceiver {
 
   private TPipeConsensusTransferResp handleTransferFileSealWithMods(
       final PipeConsensusTsFileSealWithModReq req) {
-    if (LOGGER.isDebugEnabled()) {
-      LOGGER.debug(
-          "PipeConsensus-PipeName-{}: starting to receive tsFile seal with 
mods",
-          consensusPipeName);
-    }
+    // TODO: turn it to debug after GA
+    LOGGER.info(
+        "PipeConsensus-PipeName-{}: starting to receive tsFile seal with 
mods", consensusPipeName);
     long startBorrowTsFileWriterNanos = System.nanoTime();
     PipeConsensusTsFileWriter tsFileWriter =
         
pipeConsensusTsFileWriterPool.borrowCorrespondingWriter(req.getCommitId());
@@ -896,18 +906,28 @@ public class PipeConsensusReceiver {
     }
   }
 
-  private void deleteFile(File file) {
+  private void deleteFileOrDirectoryIfExists(File file, String reason) {
     if (file.exists()) {
       try {
-        RetryUtils.retryOnException(() -> FileUtils.delete(file));
+        if (file.isDirectory()) {
+          RetryUtils.retryOnException(
+              () -> {
+                FileUtils.deleteDirectory(file);
+                return null;
+              });
+        } else {
+          RetryUtils.retryOnException(() -> FileUtils.delete(file));
+        }
         LOGGER.info(
-            "PipeConsensus-PipeName-{}: Original writing file {} was deleted.",
+            "PipeConsensus-PipeName-{}: {} {} was deleted.",
             consensusPipeName,
+            reason,
             file.getPath());
       } catch (IOException e) {
         LOGGER.warn(
-            "PipeConsensus-PipeName-{}: Failed to delete original writing file 
{}, because {}.",
+            "PipeConsensus-PipeName-{}: {} Failed to delete {}, because {}.",
             consensusPipeName,
+            reason,
             file.getPath(),
             e.getMessage(),
             e);
@@ -915,8 +935,9 @@ public class PipeConsensusReceiver {
     } else {
       if (LOGGER.isDebugEnabled()) {
         LOGGER.debug(
-            "PipeConsensus-PipeName-{}: Original file {} is not existed. No 
need to delete.",
+            "PipeConsensus-PipeName-{}: {} {} is not existed. No need to 
delete.",
             consensusPipeName,
+            reason,
             file.getPath());
       }
     }
@@ -924,7 +945,9 @@ public class PipeConsensusReceiver {
 
   private void deleteCurrentWritingFile(PipeConsensusTsFileWriter 
tsFileWriter) {
     if (tsFileWriter.getWritingFile() != null) {
-      deleteFile(tsFileWriter.getWritingFile());
+      deleteFileOrDirectoryIfExists(
+          tsFileWriter.getWritingFile(),
+          String.format("TsFileWriter-%s delete current writing file", 
tsFileWriter.index));
     } else {
       if (LOGGER.isDebugEnabled()) {
         LOGGER.debug(
@@ -1007,17 +1030,8 @@ public class PipeConsensusReceiver {
                 consensusPipeName, newReceiverDir.getPath()));
       }
       // Remove exists dir
-      if (newReceiverDir.exists()) {
-        RetryUtils.retryOnException(
-            () -> {
-              FileUtils.deleteDirectory(newReceiverDir);
-              return null;
-            });
-        LOGGER.info(
-            "PipeConsensus-PipeName-{}: Origin receiver file dir {} was 
deleted.",
-            consensusPipeName,
-            newReceiverDir.getPath());
-      }
+      deleteFileOrDirectoryIfExists(newReceiverDir, "Initial Receiver: delete 
origin receive dir");
+
       if (!newReceiverDir.mkdirs()) {
         LOGGER.warn(
             "PipeConsensus-PipeName-{}: Failed to create receiver file dir {}. 
May because authority or dir already exists etc.",
@@ -1036,32 +1050,7 @@ public class PipeConsensusReceiver {
     // Clear the original receiver file dir if exists
     for (String receiverFileBaseDir : receiveDirs) {
       File receiverDir = new File(receiverFileBaseDir);
-      if (receiverDir.exists()) {
-        try {
-          RetryUtils.retryOnException(
-              () -> {
-                FileUtils.deleteDirectory(receiverDir);
-                return null;
-              });
-          LOGGER.info(
-              "PipeConsensus-PipeName-{}: Original receiver file dir {} was 
deleted successfully..",
-              consensusPipeName,
-              receiverDir.getPath());
-        } catch (IOException e) {
-          LOGGER.warn(
-              "PipeConsensus-PipeName-{}:  Failed to delete original receiver 
file dir {}",
-              consensusPipeName,
-              receiverDir.getPath(),
-              e);
-        }
-      } else {
-        if (LOGGER.isDebugEnabled()) {
-          LOGGER.debug(
-              "PipeConsensus-PipeName-{}: Receiver exit: Original receiver 
file dir {} does not exist. No need to delete.",
-              consensusPipeName,
-              receiverDir.getPath());
-        }
-      }
+      deleteFileOrDirectoryIfExists(receiverDir, "Clear receive dir manually");
     }
   }
 
@@ -1072,9 +1061,6 @@ public class PipeConsensusReceiver {
   public synchronized void handleExit() {
     // only after closing request executor, can we clean receiver.
     requestExecutor.tryClose();
-    // Clear the tsFileWriters, receiverBuffer and receiver base dirs
-    requestExecutor.clear(false);
-    clearAllReceiverBaseDir();
     // remove metric
     MetricService.getInstance().removeMetricSet(pipeConsensusReceiverMetrics);
     // cancel periodic task
@@ -1082,8 +1068,9 @@ public class PipeConsensusReceiver {
       tsFileWriterCheckerFuture.cancel(false);
       tsFileWriterCheckerFuture = null;
     }
-    LOGGER.info(
-        "PipeConsensus-PipeName-{}: Receiver exit: Receiver exited.", 
consensusPipeName.toString());
+    // Clear the tsFileWriters, receiverBuffer and receiver base dirs
+    requestExecutor.clear(false, true);
+    LOGGER.info("Receiver-{} exit successfully.", 
consensusPipeName.toString());
   }
 
   private class PipeConsensusTsFileWriterPool {
@@ -1120,7 +1107,9 @@ public class PipeConsensusReceiver {
       Optional<PipeConsensusTsFileWriter> tsFileWriter =
           pipeConsensusTsFileWriterPool.stream()
               .filter(
-                  item -> Objects.equals(commitId, 
item.getCommitIdOfCorrespondingHolderEvent()))
+                  item ->
+                      item.isUsed()
+                          && Objects.equals(commitId, 
item.getCommitIdOfCorrespondingHolderEvent()))
               .findFirst();
 
       // If the TsFileInsertionEvent is first using tsFileWriter, we will find 
the first available
@@ -1178,7 +1167,7 @@ public class PipeConsensusReceiver {
               });
     }
 
-    public void handleExit(ConsensusPipeName consensusPipeName) {
+    public void releaseAllWriters(ConsensusPipeName consensusPipeName) {
       pipeConsensusTsFileWriterPool.forEach(
           tsFileWriter -> {
             // Wait until tsFileWriter is not used by TsFileInsertionEvent or 
timeout.
@@ -1248,25 +1237,11 @@ public class PipeConsensusReceiver {
       }
 
       String localWritingDirPath = receiverBasePath + File.separator + index;
-      LOGGER.info(
-          "PipeConsensus-PipeName-{}: tsfileWriter-{} roll to writing path {}",
-          consensusPipeName,
-          index,
-          localWritingDirPath);
       this.localWritingDir = new File(localWritingDirPath);
       // Remove exists dir
-      if (this.localWritingDir.exists()) {
-        RetryUtils.retryOnException(
-            () -> {
-              FileUtils.deleteDirectory(this.localWritingDir);
-              return null;
-            });
-        LOGGER.info(
-            "PipeConsensus-PipeName-{}: Origin receiver tsFileWriter-{} file 
dir {} was deleted.",
-            consensusPipeName,
-            index,
-            this.localWritingDir.getPath());
-      }
+      deleteFileOrDirectoryIfExists(
+          this.localWritingDir,
+          String.format("TsFileWriter-%s roll to new dir and delete last 
writing dir", index));
       if (!this.localWritingDir.mkdirs()) {
         LOGGER.warn(
             "PipeConsensus-PipeName-{}: Failed to create receiver 
tsFileWriter-{} file dir {}. May because authority or dir already exists etc.",
@@ -1278,6 +1253,11 @@ public class PipeConsensusReceiver {
                 "PipeConsensus-PipeName-%s: Failed to create tsFileWriter-%d 
receiver file dir %s. May because authority or dir already exists etc.",
                 consensusPipeName, index, this.localWritingDir.getPath()));
       }
+      LOGGER.info(
+          "PipeConsensus-PipeName-{}: tsfileWriter-{} roll to writing path {}",
+          consensusPipeName,
+          index,
+          localWritingDirPath);
     }
 
     public File getLocalWritingDir() {
@@ -1290,6 +1270,13 @@ public class PipeConsensusReceiver {
 
     public void setWritingFile(File writingFile) {
       this.writingFile = writingFile;
+      // TODO: remove it into debug after GA
+      if (writingFile == null) {
+        LOGGER.info(
+            "PipeConsensus-{}: TsFileWriter-{} set null writing file",
+            consensusPipeName.toString(),
+            index);
+      }
     }
 
     public RandomAccessFile getWritingFileWriter() {
@@ -1298,6 +1285,13 @@ public class PipeConsensusReceiver {
 
     public void setWritingFileWriter(RandomAccessFile writingFileWriter) {
       this.writingFileWriter = writingFileWriter;
+      // TODO: remove it into debug after GA
+      if (writingFileWriter == null) {
+        LOGGER.info(
+            "PipeConsensus-{}: TsFileWriter-{} set null writing file writer",
+            consensusPipeName.toString(),
+            index);
+      }
     }
 
     public TCommitId getCommitIdOfCorrespondingHolderEvent() {
@@ -1367,19 +1361,8 @@ public class PipeConsensusReceiver {
 
       // close file
       if (writingFile != null) {
-        try {
-          RetryUtils.retryOnException(() -> FileUtils.delete(writingFile));
-          LOGGER.info(
-              "PipeConsensus-PipeName-{}: TsFileWriter exit: Writing file {} 
was deleted.",
-              consensusPipeName,
-              writingFile.getPath());
-        } catch (Exception e) {
-          LOGGER.warn(
-              "PipeConsensus-PipeName-{}: TsFileWriter exit: Delete writing 
file {} error.",
-              consensusPipeName,
-              writingFile.getPath(),
-              e);
-        }
+        deleteFileOrDirectoryIfExists(
+            writingFile, String.format("TsFileWriter-%s exit: delete writing 
file", this.index));
         setWritingFile(null);
       } else {
         if (LOGGER.isDebugEnabled()) {
@@ -1432,12 +1415,10 @@ public class PipeConsensusReceiver {
       long startAcquireLockNanos = System.nanoTime();
       lock.lock();
       try {
-        // once thread gets lock, it will judge whether receiver is closed
         if (isClosed.get()) {
           return PipeConsensusReceiverAgent.closedResp(
               consensusPipeName.toString(), req.getCommitId());
         }
-
         long startDispatchNanos = System.nanoTime();
         metric.recordAcquireExecutorLockTimer(startDispatchNanos - 
startAcquireLockNanos);
 
@@ -1552,12 +1533,10 @@ public class PipeConsensusReceiver {
                   !condition.await(
                       PIPE_CONSENSUS_RECEIVER_MAX_WAITING_TIME_IN_MS, 
TimeUnit.MILLISECONDS);
 
-              // once thread gets lock, it will judge whether receiver is 
closed
               if (isClosed.get()) {
                 return PipeConsensusReceiverAgent.closedResp(
                     consensusPipeName.toString(), req.getCommitId());
               }
-
               // If some reqs find the buffer no longer contains their 
requestMeta after jumping out
               // from condition.await, it may indicate that during their wait, 
some reqs with newer
               // pipeTaskStartTimes or rebootTimes came in and refreshed the 
requestBuffer. In that
@@ -1622,7 +1601,7 @@ public class PipeConsensusReceiver {
           consensusPipeName);
       // since pipe task will resend all data that hasn't synchronized after 
dataNode reboots, it's
       // safe to clear all events in buffer.
-      clear(true);
+      clear(true, false);
       // sync the follower's connectorRebootTimes with connector's actual 
rebootTimes.
       this.connectorRebootTimes = connectorRebootTimes;
       this.pipeTaskRestartTimes = 0;
@@ -1634,7 +1613,7 @@ public class PipeConsensusReceiver {
           consensusPipeName);
       // since pipe task will resend all data that hasn't synchronized after 
restarts, it's safe to
       // clear all events in buffer.
-      clear(false);
+      clear(false, false);
       this.pipeTaskRestartTimes = pipeTaskRestartTimes;
     }
 
@@ -1655,11 +1634,21 @@ public class PipeConsensusReceiver {
       }
     }
 
-    private void clear(boolean resetSyncIndex) {
-      this.reqExecutionOrderBuffer.clear();
-      this.tsFileWriterPool.handleExit(consensusPipeName);
-      if (resetSyncIndex) {
-        this.onSyncedReplicateIndex = 0;
+    private void clear(boolean resetSyncIndex, boolean cleanBaseDir) {
+      // TsFilePiece Writing may out of RequestExecutor.lock, meaning that we 
must use additional
+      // lock here to ensure serial execution of cleanup and write piece
+      tsFilePieceReadWriteLock.writeLock().lock();
+      try {
+        this.reqExecutionOrderBuffer.clear();
+        this.tsFileWriterPool.releaseAllWriters(consensusPipeName);
+        if (resetSyncIndex) {
+          this.onSyncedReplicateIndex = 0;
+        }
+        if (cleanBaseDir) {
+          clearAllReceiverBaseDir();
+        }
+      } finally {
+        tsFilePieceReadWriteLock.writeLock().unlock();
       }
     }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiverAgent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiverAgent.java
index 6d3bef50a00..92af030e623 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiverAgent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiverAgent.java
@@ -19,8 +19,10 @@
 
 package org.apache.iotdb.db.pipe.receiver.protocol.pipeconsensus;
 
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.consensus.ConsensusGroupId;
+import org.apache.iotdb.commons.consensus.DataRegionId;
 import 
org.apache.iotdb.commons.pipe.connector.payload.pipeconsensus.request.PipeConsensusRequestVersion;
 import org.apache.iotdb.consensus.IConsensus;
 import org.apache.iotdb.consensus.pipe.PipeConsensus;
@@ -69,7 +71,7 @@ public class PipeConsensusReceiverAgent implements 
ConsensusPipeReceiver {
           ConsensusGroupId, Map<ConsensusPipeName, 
AtomicReference<PipeConsensusReceiver>>>
       replicaReceiverMap = new ConcurrentHashMap<>();
 
-  private final Set<ConsensusPipeName> createdConsensusPipes = new 
CopyOnWriteArraySet<>();
+  private final Set<ConsensusPipeName> aliveReceivers = new 
CopyOnWriteArraySet<>();
 
   private PipeConsensus pipeConsensus;
 
@@ -97,7 +99,7 @@ public class PipeConsensusReceiverAgent implements 
ConsensusPipeReceiver {
                 TSStatusCode.PIPE_CONSENSUS_CLOSE_ERROR,
                 "PipeConsensus receiver received a request after it was 
closed."));
     LOGGER.info(
-        "PipeConsensus-{}: receive on-the-fly no.{} event after consensus pipe 
was dropped, discard it",
+        "PipeConsensus-{}: receive on-the-fly no.{} event after data region 
was deleted, discard it",
         consensusInfo,
         tCommitId);
     return new TPipeConsensusTransferResp(status);
@@ -135,11 +137,6 @@ public class PipeConsensusReceiverAgent implements 
ConsensusPipeReceiver {
     // 2. Route to given consensusPipeTask's receiver
     ConsensusPipeName consensusPipeName =
         new ConsensusPipeName(consensusGroupId, leaderDataNodeId, thisNodeId);
-    // 3. Judge whether pipe task was dropped
-    if (!createdConsensusPipes.contains(consensusPipeName)) {
-      return null;
-    }
-
     AtomicBoolean isFirstGetReceiver = new AtomicBoolean(false);
     AtomicReference<PipeConsensusReceiver> receiverReference =
         consensusPipe2ReceiverMap.computeIfAbsent(
@@ -149,6 +146,11 @@ public class PipeConsensusReceiverAgent implements 
ConsensusPipeReceiver {
               return new AtomicReference<>(null);
             });
 
+    // 3. If not first get receiver && receiver is not alive, return null.
+    if (!isFirstGetReceiver.get() && 
!aliveReceivers.contains(consensusPipeName)) {
+      return null;
+    }
+
     if (receiverReference.get() == null) {
       return internalSetAndGetReceiver(
           consensusGroupId, consensusPipeName, reqVersion, isFirstGetReceiver);
@@ -182,10 +184,13 @@ public class PipeConsensusReceiverAgent implements 
ConsensusPipeReceiver {
 
     if (isFirstGetReceiver.get()) {
       if (RECEIVER_CONSTRUCTORS.containsKey(reqVersion)) {
+        // If is first get receiver, set this receiver as alive.
+        aliveReceivers.add(consensusPipeName);
         receiverReference.set(
             RECEIVER_CONSTRUCTORS
                 .get(reqVersion)
                 .apply(pipeConsensus, consensusGroupId, consensusPipeName));
+        LOGGER.info("Receiver-{} is ready", consensusPipeName);
       } else {
         throw new UnsupportedOperationException(
             String.format("Unsupported pipeConsensus request version %d", 
reqVersion));
@@ -210,25 +215,33 @@ public class PipeConsensusReceiverAgent implements 
ConsensusPipeReceiver {
     }
   }
 
-  /** Release receiver of given pipeConsensusTask */
+  /** Release all receivers of given data region */
   @Override
-  public final void handleDropPipeConsensusTask(ConsensusPipeName pipeName) {
+  public final void releaseReceiverResource(DataRegionId dataRegionId) {
     // 1. Route to given consensusGroup's receiver map
     Map<ConsensusPipeName, AtomicReference<PipeConsensusReceiver>> 
consensusPipe2ReciverMap =
-        replicaReceiverMap.getOrDefault(pipeName.getConsensusGroupId(), new 
ConcurrentHashMap<>());
-    // 2. Route to given consensusPipeTask's receiver
-    AtomicReference<PipeConsensusReceiver> receiverReference =
-        consensusPipe2ReciverMap.getOrDefault(pipeName, null);
-    // 3. Release receiver
-    if (receiverReference != null) {
-      createdConsensusPipes.remove(pipeName);
-      receiverReference.get().handleExit();
-      receiverReference.set(null);
-      consensusPipe2ReciverMap.remove(pipeName);
-    }
-  }
-
-  public void markConsensusPipeAsCreated(ConsensusPipeName pipeName) {
-    createdConsensusPipes.add(pipeName);
+        this.replicaReceiverMap.getOrDefault(
+            ConsensusGroupId.Factory.create(
+                TConsensusGroupType.DataRegion.getValue(), 
dataRegionId.getId()),
+            new ConcurrentHashMap<>());
+    // 2. Release all related receivers
+    consensusPipe2ReciverMap.entrySet().stream()
+        .filter(entry -> entry.getKey().getReceiverDataNodeId() == thisNodeId)
+        .forEach(
+            receiverEntry -> {
+              ConsensusPipeName consensusPipeName = receiverEntry.getKey();
+              AtomicReference<PipeConsensusReceiver> receiverReference = 
receiverEntry.getValue();
+              if (receiverReference != null) {
+                // no longer receive new request
+                aliveReceivers.remove(consensusPipeName);
+                receiverReference.get().handleExit();
+                receiverReference.set(null);
+              }
+            });
+    // 3. Release replica map
+    this.replicaReceiverMap.remove(dataRegionId);
+    // 4. GC receiver map
+    consensusPipe2ReciverMap.clear();
+    LOGGER.info("All Receivers related to {} are released.", dataRegionId);
   }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java
index d684367787d..cf24235f9ed 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java
@@ -781,6 +781,7 @@ public class StorageEngine implements IService {
         region.syncDeleteDataFiles();
         region.deleteFolder(systemDir);
         region.deleteDALFolderAndClose();
+        
PipeDataNodeAgent.receiver().pipeConsensus().releaseReceiverResource(regionId);
         switch (CONFIG.getDataRegionConsensusProtocolClass()) {
           case ConsensusFactory.IOT_CONSENSUS:
           case ConsensusFactory.IOT_CONSENSUS_V2:


Reply via email to