This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 0492b6a9a83 Pipe: Fix receiver can not delete dir on thread exit if 
files in dir are not loaded successfully (#12302)
0492b6a9a83 is described below

commit 0492b6a9a83f3de3aae0cd7370f5b3c6f469f8b4
Author: Steve Yurong Su <[email protected]>
AuthorDate: Mon Apr 8 17:29:50 2024 +0800

    Pipe: Fix receiver can not delete dir on thread exit if files in dir are 
not loaded successfully (#12302)
---
 .../receiver/protocol/IoTDBConfigNodeReceiver.java |  31 ++-
 .../protocol/thrift/IoTDBDataNodeReceiver.java     |  20 +-
 .../resource/tsfile/PipeTsFileResourceManager.java |   1 -
 .../commons/pipe/receiver/IoTDBFileReceiver.java   | 263 ++++++++++++++-------
 4 files changed, 218 insertions(+), 97 deletions(-)

diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/receiver/protocol/IoTDBConfigNodeReceiver.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/receiver/protocol/IoTDBConfigNodeReceiver.java
index 28e08e231a8..6f2fb7966db 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/receiver/protocol/IoTDBConfigNodeReceiver.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/receiver/protocol/IoTDBConfigNodeReceiver.java
@@ -88,13 +88,13 @@ public class IoTDBConfigNodeReceiver extends 
IoTDBFileReceiver {
 
   private static final AtomicInteger QUERY_ID_GENERATOR = new AtomicInteger(0);
 
-  private final ConfigManager configManager = 
ConfigNode.getInstance().getConfigManager();
-
-  private static final PipeConfigPhysicalPlanTSStatusVisitor statusVisitor =
+  private static final PipeConfigPhysicalPlanTSStatusVisitor STATUS_VISITOR =
       new PipeConfigPhysicalPlanTSStatusVisitor();
-  private static final PipeConfigPhysicalPlanExceptionVisitor exceptionVisitor 
=
+  private static final PipeConfigPhysicalPlanExceptionVisitor 
EXCEPTION_VISITOR =
       new PipeConfigPhysicalPlanExceptionVisitor();
 
+  private final ConfigManager configManager = 
ConfigNode.getInstance().getConfigManager();
+
   @Override
   public TPipeTransferResp receive(final TPipeTransferReq req) {
     try {
@@ -135,13 +135,16 @@ public class IoTDBConfigNodeReceiver extends 
IoTDBFileReceiver {
           RpcUtils.getStatus(
               TSStatusCode.PIPE_TYPE_ERROR,
               String.format("Unsupported PipeRequestType on ConfigNode %s.", 
rawRequestType));
-      LOGGER.warn("Unsupported PipeRequestType on ConfigNode, response status 
= {}.", status);
+      LOGGER.warn(
+          "Receiver id = {}: Unsupported PipeRequestType on ConfigNode, 
response status = {}.",
+          receiverId.get(),
+          status);
       return new TPipeTransferResp(status);
     } catch (Exception e) {
       final String error =
           "Exception encountered while handling pipe transfer request. Root 
cause: "
               + e.getMessage();
-      LOGGER.warn(error, e);
+      LOGGER.warn("Receiver id = {}: {}", receiverId.get(), error, e);
       return new TPipeTransferResp(RpcUtils.getStatus(TSStatusCode.PIPE_ERROR, 
error));
     }
   }
@@ -167,12 +170,20 @@ public class IoTDBConfigNodeReceiver extends 
IoTDBFileReceiver {
     try {
       result = executePlan(plan);
       if (result.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
-        LOGGER.warn("Failure status encountered while executing plan {}: {}", 
plan, result);
-        result = statusVisitor.process(plan, result);
+        LOGGER.warn(
+            "Receiver id = {}: Failure status encountered while executing plan 
{}: {}",
+            receiverId.get(),
+            plan,
+            result);
+        result = STATUS_VISITOR.process(plan, result);
       }
     } catch (Exception e) {
-      LOGGER.warn("Exception encountered while executing plan {}: ", plan, e);
-      result = exceptionVisitor.process(plan, e);
+      LOGGER.warn(
+          "Receiver id = {}: Exception encountered while executing plan {}: ",
+          receiverId.get(),
+          plan,
+          e);
+      result = EXCEPTION_VISITOR.process(plan, e);
     }
     return result;
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
index 783069c566a..94f288a0f6d 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
@@ -182,11 +182,14 @@ public class IoTDBDataNodeReceiver extends 
IoTDBFileReceiver {
           RpcUtils.getStatus(
               TSStatusCode.PIPE_TYPE_ERROR,
               String.format("Unknown PipeRequestType %s.", rawRequestType));
-      LOGGER.warn("Unknown PipeRequestType, response status = {}.", status);
+      LOGGER.warn(
+          "Receiver id = {}: Unknown PipeRequestType, response status = {}.",
+          receiverId.get(),
+          status);
       return new TPipeTransferResp(status);
     } catch (IOException e) {
-      String error = String.format("Serialization error during pipe receiving, 
%s", e);
-      LOGGER.warn(error);
+      final String error = String.format("Serialization error during pipe 
receiving, %s", e);
+      LOGGER.warn("Receiver id = {}: {}", receiverId.get(), error, e);
       return new TPipeTransferResp(RpcUtils.getStatus(TSStatusCode.PIPE_ERROR, 
error));
     }
   }
@@ -312,11 +315,18 @@ public class IoTDBDataNodeReceiver extends 
IoTDBFileReceiver {
         return result;
       } else {
         LOGGER.warn(
-            "Failure status encountered while executing statement {}: {}", 
statement, result);
+            "Receiver id = {}: Failure status encountered while executing 
statement {}: {}",
+            receiverId.get(),
+            statement,
+            result);
         return statement.accept(statusVisitor, result);
       }
     } catch (Exception e) {
-      LOGGER.warn("Exception encountered while executing statement {}: ", 
statement, e);
+      LOGGER.warn(
+          "Receiver id = {}: Exception encountered while executing statement 
{}: ",
+          receiverId.get(),
+          statement,
+          e);
       return statement.accept(exceptionVisitor, e);
     }
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceManager.java
index 33ef8b02636..5dd63847866 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceManager.java
@@ -214,7 +214,6 @@ public class PipeTsFileResourceManager {
    * delete the file. if the given file is not a hardlink or copied file, do 
nothing.
    *
    * @param hardlinkOrCopiedFile the copied or hardlinked file
-   * @throws IOException when delete file failed
    */
   public void decreaseFileReference(File hardlinkOrCopiedFile) {
     lock.lock();
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java
index fe0a2644f04..79fed02b1e5 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java
@@ -37,13 +37,13 @@ import org.apache.iotdb.rpc.TSStatusCode;
 import org.apache.iotdb.service.rpc.thrift.TPipeTransferResp;
 import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
 
+import org.apache.commons.io.FileUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.io.IOException;
 import java.io.RandomAccessFile;
-import java.nio.file.Files;
 import java.util.List;
 import java.util.Objects;
 import java.util.concurrent.atomic.AtomicLong;
@@ -61,7 +61,7 @@ public abstract class IoTDBFileReceiver implements 
IoTDBReceiver {
 
   // Used to generate transfer id, which is used to identify a receiver thread.
   private static final AtomicLong RECEIVER_ID_GENERATOR = new AtomicLong(0);
-  private final AtomicLong receiverId = new AtomicLong(0);
+  protected final AtomicLong receiverId = new AtomicLong(0);
 
   private File writingFile;
   private RandomAccessFile writingFileWriter;
@@ -94,53 +94,69 @@ public abstract class IoTDBFileReceiver implements 
IoTDBReceiver {
     if (receiverFileDirWithIdSuffix.get() != null) {
       if (receiverFileDirWithIdSuffix.get().exists()) {
         try {
-          Files.delete(receiverFileDirWithIdSuffix.get().toPath());
+          FileUtils.deleteDirectory(receiverFileDirWithIdSuffix.get());
           LOGGER.info(
-              "Original receiver file dir {} was deleted.",
+              "Receiver id = {}: Original receiver file dir {} was deleted.",
+              receiverId.get(),
               receiverFileDirWithIdSuffix.get().getPath());
         } catch (IOException e) {
           LOGGER.warn(
-              "Failed to delete original receiver file dir {}, because {}.",
+              "Receiver id = {}: Failed to delete original receiver file dir 
{}, because {}.",
+              receiverId.get(),
               receiverFileDirWithIdSuffix.get().getPath(),
-              e.getMessage());
+              e.getMessage(),
+              e);
         }
       } else {
-        LOGGER.info(
-            "Original receiver file dir {} is not existed. No need to delete.",
-            receiverFileDirWithIdSuffix.get().getPath());
+        if (LOGGER.isDebugEnabled()) {
+          LOGGER.debug(
+              "Receiver id = {}: Original receiver file dir {} is not existed. 
No need to delete.",
+              receiverId.get(),
+              receiverFileDirWithIdSuffix.get().getPath());
+        }
       }
       receiverFileDirWithIdSuffix.set(null);
     } else {
-      LOGGER.info("Current receiver file dir is null. No need to delete.");
+      if (LOGGER.isDebugEnabled()) {
+        LOGGER.debug(
+            "Receiver id = {}: Current receiver file dir is null. No need to 
delete.",
+            receiverId.get());
+      }
     }
 
     final String receiverFileBaseDir;
     try {
       receiverFileBaseDir = getReceiverFileBaseDir();
       if (Objects.isNull(receiverFileBaseDir)) {
-        LOGGER.error(
-            "Failed to init pipe receiver file folder manager because all 
disks of folders are full.");
+        LOGGER.warn(
+            "Receiver id = {}: Failed to init pipe receiver file folder 
manager because all disks of folders are full.",
+            receiverId.get());
         return new 
TPipeTransferResp(StatusUtils.getStatus(TSStatusCode.DISK_SPACE_INSUFFICIENT));
       }
     } catch (Exception e) {
-      LOGGER.error(
-          "Fail to create pipe receiver file folder because all disks of 
folders are full.", e);
+      LOGGER.warn(
+          "Receiver id = {}: Failed to create pipe receiver file folder 
because all disks of folders are full.",
+          receiverId.get(),
+          e);
       return new 
TPipeTransferResp(StatusUtils.getStatus(TSStatusCode.DISK_SPACE_INSUFFICIENT));
     }
 
     // Create a new receiver file dir
     final File newReceiverDir = new File(receiverFileBaseDir, 
Long.toString(receiverId.get()));
-    if (!newReceiverDir.exists()) {
-      if (newReceiverDir.mkdirs()) {
-        LOGGER.info("Receiver file dir {} was created.", 
newReceiverDir.getPath());
-      } else {
-        LOGGER.error("Failed to create receiver file dir {}.", 
newReceiverDir.getPath());
-      }
+    if (!newReceiverDir.exists() && !newReceiverDir.mkdirs()) {
+      LOGGER.warn(
+          "Receiver id = {}: Failed to create receiver file dir {}.",
+          receiverId.get(),
+          newReceiverDir.getPath());
+      return new TPipeTransferResp(
+          RpcUtils.getStatus(
+              TSStatusCode.PIPE_HANDSHAKE_ERROR,
+              String.format("Failed to create receiver file dir %s.", 
newReceiverDir.getPath())));
     }
     receiverFileDirWithIdSuffix.set(newReceiverDir);
 
     LOGGER.info(
-        "Handshake successfully, receiver id = {}, receiver file dir = {}.",
+        "Receiver id = {}: Handshake successfully, receiver file dir = {}.",
         receiverId.get(),
         newReceiverDir.getPath());
     return new TPipeTransferResp(RpcUtils.SUCCESS_STATUS);
@@ -157,7 +173,8 @@ public abstract class IoTDBFileReceiver implements 
IoTDBReceiver {
           RpcUtils.getStatus(
               TSStatusCode.PIPE_HANDSHAKE_ERROR,
               "Receiver can not get clusterId from config node.");
-      LOGGER.warn("Handshake failed, response status = {}.", status);
+      LOGGER.warn(
+          "Receiver id = {}: Handshake failed, response status = {}.", 
receiverId.get(), status);
       return new TPipeTransferResp(status);
     }
 
@@ -168,7 +185,8 @@ public abstract class IoTDBFileReceiver implements 
IoTDBReceiver {
       final TSStatus status =
           RpcUtils.getStatus(
               TSStatusCode.PIPE_HANDSHAKE_ERROR, "Handshake request does not 
contain clusterId.");
-      LOGGER.warn("Handshake failed, response status = {}.", status);
+      LOGGER.warn(
+          "Receiver id = {}: Handshake failed, response status = {}.", 
receiverId.get(), status);
       return new TPipeTransferResp(status);
     }
 
@@ -180,7 +198,8 @@ public abstract class IoTDBFileReceiver implements 
IoTDBReceiver {
               String.format(
                   "Receiver and sender are from the same cluster %s.",
                   clusterIdFromHandshakeRequest));
-      LOGGER.warn("Handshake failed, response status = {}.", status);
+      LOGGER.warn(
+          "Receiver id = {}: Handshake failed, response status = {}.", 
receiverId.get(), status);
       return new TPipeTransferResp(status);
     }
 
@@ -192,7 +211,8 @@ public abstract class IoTDBFileReceiver implements 
IoTDBReceiver {
           RpcUtils.getStatus(
               TSStatusCode.PIPE_HANDSHAKE_ERROR,
               "Handshake request does not contain timestampPrecision.");
-      LOGGER.warn("Handshake failed, response status = {}.", status);
+      LOGGER.warn(
+          "Receiver id = {}: Handshake failed, response status = {}.", 
receiverId.get(), status);
       return new TPipeTransferResp(status);
     }
 
@@ -237,7 +257,10 @@ public abstract class IoTDBFileReceiver implements 
IoTDBReceiver {
                 String.format(
                     "Request sender to reset file reader's offset from %s to 
%s.",
                     req.getStartWritingOffset(), writingFileWriter.length()));
-        LOGGER.warn("File offset reset requested by receiver, response status 
= {}.", status);
+        LOGGER.warn(
+            "Receiver id = {}: File offset reset requested by receiver, 
response status = {}.",
+            receiverId.get(),
+            status);
         return PipeTransferFilePieceResp.toTPipeTransferResp(status, 
writingFileWriter.length());
       }
 
@@ -246,7 +269,8 @@ public abstract class IoTDBFileReceiver implements 
IoTDBReceiver {
       return PipeTransferFilePieceResp.toTPipeTransferResp(
           RpcUtils.SUCCESS_STATUS, writingFileWriter.length());
     } catch (Exception e) {
-      LOGGER.warn("Failed to write file piece from req {}.", req, e);
+      LOGGER.warn(
+          "Receiver id = {}: Failed to write file piece from req {}.", 
receiverId.get(), req, e);
       final TSStatus status =
           RpcUtils.getStatus(
               TSStatusCode.PIPE_TRANSFER_FILE_ERROR,
@@ -267,8 +291,9 @@ public abstract class IoTDBFileReceiver implements 
IoTDBReceiver {
     }
 
     LOGGER.info(
-        "Writing file {} is not existed or name is not correct, try to create 
it. "
+        "Receiver id = {}: Writing file {} is not existed or name is not 
correct, try to create it. "
             + "Current writing file is {}.",
+        receiverId.get(),
         fileName,
         writingFile == null ? "null" : writingFile.getPath());
 
@@ -284,16 +309,23 @@ public abstract class IoTDBFileReceiver implements 
IoTDBReceiver {
     if (!receiverFileDirWithIdSuffix.get().exists()) {
       if (receiverFileDirWithIdSuffix.get().mkdirs()) {
         LOGGER.info(
-            "Receiver file dir {} was created.", 
receiverFileDirWithIdSuffix.get().getPath());
+            "Receiver id = {}: Receiver file dir {} was created.",
+            receiverId.get(),
+            receiverFileDirWithIdSuffix.get().getPath());
       } else {
         LOGGER.error(
-            "Failed to create receiver file dir {}.", 
receiverFileDirWithIdSuffix.get().getPath());
+            "Receiver id = {}: Failed to create receiver file dir {}.",
+            receiverId.get(),
+            receiverFileDirWithIdSuffix.get().getPath());
       }
     }
 
     writingFile = new File(receiverFileDirWithIdSuffix.get(), fileName);
     writingFileWriter = new RandomAccessFile(writingFile, "rw");
-    LOGGER.info("Writing file {} was created. Ready to write file pieces.", 
writingFile.getPath());
+    LOGGER.info(
+        "Receiver id = {}: Writing file {} was created. Ready to write file 
pieces.",
+        receiverId.get(),
+        writingFile.getPath());
   }
 
   private boolean isFileExistedAndNameCorrect(String fileName) {
@@ -305,17 +337,24 @@ public abstract class IoTDBFileReceiver implements 
IoTDBReceiver {
       try {
         writingFileWriter.close();
         LOGGER.info(
-            "Current writing file writer {} was closed.",
+            "Receiver id = {}: Current writing file writer {} was closed.",
+            receiverId.get(),
             writingFile == null ? "null" : writingFile.getPath());
       } catch (IOException e) {
         LOGGER.warn(
-            "Failed to close current writing file writer {}, because {}.",
+            "Receiver id = {}: Failed to close current writing file writer {}, 
because {}.",
+            receiverId.get(),
             writingFile == null ? "null" : writingFile.getPath(),
-            e.getMessage());
+            e.getMessage(),
+            e);
       }
       writingFileWriter = null;
     } else {
-      LOGGER.info("Current writing file writer is null. No need to close.");
+      if (LOGGER.isDebugEnabled()) {
+        LOGGER.debug(
+            "Receiver id = {}: Current writing file writer is null. No need to 
close.",
+            receiverId.get());
+      }
     }
   }
 
@@ -323,23 +362,36 @@ public abstract class IoTDBFileReceiver implements 
IoTDBReceiver {
     if (writingFile != null) {
       deleteFile(writingFile);
     } else {
-      LOGGER.info("Current writing file is null. No need to delete.");
+      if (LOGGER.isDebugEnabled()) {
+        LOGGER.debug(
+            "Receiver id = {}: Current writing file is null. No need to 
delete.", receiverId.get());
+      }
     }
   }
 
   private void deleteFile(File file) {
     if (file.exists()) {
       try {
-        Files.delete(file.toPath());
-        LOGGER.info("Original writing file {} was deleted.", file.getPath());
+        FileUtils.delete(file);
+        LOGGER.info(
+            "Receiver id = {}: Original writing file {} was deleted.",
+            receiverId.get(),
+            file.getPath());
       } catch (IOException e) {
         LOGGER.warn(
-            "Failed to delete original writing file {}, because {}.",
+            "Receiver id = {}: Failed to delete original writing file {}, 
because {}.",
+            receiverId.get(),
             file.getPath(),
-            e.getMessage());
+            e.getMessage(),
+            e);
       }
     } else {
-      LOGGER.info("Original file {} is not existed. No need to delete.", 
file.getPath());
+      if (LOGGER.isDebugEnabled()) {
+        LOGGER.debug(
+            "Receiver id = {}: Original file {} is not existed. No need to 
delete.",
+            receiverId.get(),
+            file.getPath());
+      }
     }
   }
 
@@ -347,7 +399,8 @@ public abstract class IoTDBFileReceiver implements 
IoTDBReceiver {
     final boolean offsetCorrect = writingFileWriter.length() == offset;
     if (!offsetCorrect) {
       LOGGER.warn(
-          "Writing file {}'s offset is {}, but request sender's offset is {}.",
+          "Receiver id = {}: Writing file {}'s offset is {}, but request 
sender's offset is {}.",
+          receiverId.get(),
           writingFile.getPath(),
           writingFileWriter.length(),
           offset);
@@ -392,20 +445,21 @@ public abstract class IoTDBFileReceiver implements 
IoTDBReceiver {
       final TSStatus status = loadFileV1(req, fileAbsolutePath);
       if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
         LOGGER.info(
-            "Seal file {} successfully. Receiver id is {}.", fileAbsolutePath, 
receiverId.get());
+            "Receiver id = {}: Seal file {} successfully.", receiverId.get(), 
fileAbsolutePath);
       } else {
         LOGGER.warn(
-            "Failed to seal file {}, because {}. Receiver id is {}.",
+            "Receiver id = {}: Failed to seal file {}, because {}.",
+            receiverId.get(),
             fileAbsolutePath,
-            status.getMessage(),
-            receiverId.get());
+            status.getMessage());
       }
       return new TPipeTransferResp(status);
     } catch (IOException e) {
       LOGGER.warn(
-          String.format(
-              "Failed to seal file %s from req %s. Receiver id is %d.",
-              writingFile, req, receiverId.get()),
+          "Receiver id = {}: Failed to seal file {} from req {}.",
+          receiverId.get(),
+          writingFile,
+          req,
           e);
       return new TPipeTransferResp(
           RpcUtils.getStatus(
@@ -413,7 +467,7 @@ public abstract class IoTDBFileReceiver implements 
IoTDBReceiver {
               String.format("Failed to seal file %s because %s", writingFile, 
e.getMessage())));
     } finally {
       // If the writing file is not sealed successfully, the writing file will 
be deleted.
-      // All pieces of the writing file and its mod(if exists) should be 
retransmitted by the
+      // All pieces of the writing file and its mod (if exists) should be 
retransmitted by the
       // sender.
       closeCurrentWritingFileWriter();
       deleteCurrentWritingFile();
@@ -471,21 +525,18 @@ public abstract class IoTDBFileReceiver implements 
IoTDBReceiver {
       final TSStatus status = loadFileV2(req, fileAbsolutePaths);
       if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
         LOGGER.info(
-            "Seal file {} successfully. Receiver id is {}.", 
fileAbsolutePaths, receiverId.get());
+            "Receiver id = {}: Seal file {} successfully.", receiverId.get(), 
fileAbsolutePaths);
       } else {
         LOGGER.warn(
-            "Failed to seal file {}, status is {}. Receiver id is {}.",
+            "Receiver id = {}: Failed to seal file {}, status is {}.",
+            receiverId.get(),
             fileAbsolutePaths,
-            status,
-            receiverId.get());
+            status);
       }
       return new TPipeTransferResp(status);
     } catch (IOException | IllegalPathException e) {
       LOGGER.warn(
-          String.format(
-              "Failed to seal file %s from req %s. Receiver id is %d.",
-              writingFile, req, receiverId.get()),
-          e);
+          "Receiver id = {}: Failed to seal file {} from req {}.", 
receiverId.get(), files, req, e);
       return new TPipeTransferResp(
           RpcUtils.getStatus(
               TSStatusCode.PIPE_TRANSFER_FILE_ERROR,
@@ -508,7 +559,10 @@ public abstract class IoTDBFileReceiver implements 
IoTDBReceiver {
           RpcUtils.getStatus(
               TSStatusCode.PIPE_TRANSFER_FILE_ERROR,
               String.format("Failed to seal file %s, the file does not 
exist.", fileName));
-      LOGGER.warn(status.getMessage());
+      LOGGER.warn(
+          "Receiver id = {}: Failed to seal file {}, because the file does not 
exist.",
+          receiverId.get(),
+          fileName);
       return new TPipeTransferResp(status);
     }
 
@@ -520,9 +574,16 @@ public abstract class IoTDBFileReceiver implements 
IoTDBReceiver {
                   "Failed to seal file %s, because the length of file is not 
correct. "
                       + "The original file has length %s, but receiver file 
has length %s.",
                   fileName, fileLength, writingFileWriter.length()));
-      LOGGER.warn(status.getMessage());
+      LOGGER.warn(
+          "Receiver id = {}: Failed to seal file {}, because the length of 
file is not correct. "
+              + "The original file has length {}, but receiver file has length 
{}.",
+          receiverId.get(),
+          fileName,
+          fileLength,
+          writingFileWriter.length());
       return new TPipeTransferResp(status);
     }
+
     return null;
   }
 
@@ -533,8 +594,12 @@ public abstract class IoTDBFileReceiver implements 
IoTDBReceiver {
           RpcUtils.getStatus(
               TSStatusCode.PIPE_TRANSFER_FILE_ERROR,
               String.format(
-                  "Failed to seal file %s, but writing file is %s.", fileName, 
writingFile));
-      LOGGER.warn(status.getMessage());
+                  "Failed to seal file %s, because writing file is %s.", 
fileName, writingFile));
+      LOGGER.warn(
+          "Receiver id = {}: Failed to seal file {}, because writing file is 
{}.",
+          receiverId.get(),
+          fileName,
+          writingFile);
       return new TPipeTransferResp(status);
     }
 
@@ -546,9 +611,16 @@ public abstract class IoTDBFileReceiver implements 
IoTDBReceiver {
                   "Failed to seal file %s, because the length of file is not 
correct. "
                       + "The original file has length %s, but receiver file 
has length %s.",
                   fileName, fileLength, writingFileWriter.length()));
-      LOGGER.warn(status.getMessage());
+      LOGGER.warn(
+          "Receiver id = {}: Failed to seal file {}, because the length of 
file is not correct. "
+              + "The original file has length {}, but receiver file has length 
{}.",
+          receiverId.get(),
+          fileName,
+          fileLength,
+          writingFileWriter.length());
       return new TPipeTransferResp(status);
     }
+
     return null;
   }
 
@@ -557,7 +629,9 @@ public abstract class IoTDBFileReceiver implements 
IoTDBReceiver {
         writingFile != null && writingFile.exists() && writingFileWriter != 
null;
     if (!isWritingFileAvailable) {
       LOGGER.info(
-          "Writing file {} is not available. Writing file is null: {}, writing 
file exists: {}, writing file writer is null: {}.",
+          "Receiver id = {}: Writing file {} is not available. "
+              + "Writing file is null: {}, writing file exists: {}, writing 
file writer is null: {}.",
+          receiverId.get(),
           writingFile,
           writingFile == null,
           writingFile != null && writingFile.exists(),
@@ -578,52 +652,79 @@ public abstract class IoTDBFileReceiver implements 
IoTDBReceiver {
     if (writingFileWriter != null) {
       try {
         writingFileWriter.close();
-        LOGGER.info("IoTDBFileReceiverV1#handleExit: writing file writer was 
closed.");
+        LOGGER.info(
+            "Receiver id = {}: Handling exit: Writing file writer was 
closed.", receiverId.get());
       } catch (Exception e) {
-        LOGGER.warn("IoTDBFileReceiverV1#handleExit: close writing file writer 
error.", e);
+        LOGGER.warn(
+            "Receiver id = {}: Handling exit: Close writing file writer 
error.",
+            receiverId.get(),
+            e);
       }
       writingFileWriter = null;
     } else {
-      LOGGER.info("IoTDBFileReceiverV1#handleExit: writing file writer is 
null. No need to close.");
+      if (LOGGER.isDebugEnabled()) {
+        LOGGER.debug(
+            "Receiver id = {}: Handling exit: Writing file writer is null. No 
need to close.",
+            receiverId.get());
+      }
     }
 
     if (writingFile != null) {
       try {
-        Files.delete(writingFile.toPath());
+        FileUtils.delete(writingFile);
         LOGGER.info(
-            "IoTDBFileReceiverV1#handleExit: writing file {} was deleted.", 
writingFile.getPath());
+            "Receiver id = {}: Handling exit: Writing file {} was deleted.",
+            receiverId.get(),
+            writingFile.getPath());
       } catch (Exception e) {
-        LOGGER.warn("IoTDBFileReceiverV1#handleExit: delete file {} error.", 
writingFile.getPath());
+        LOGGER.warn(
+            "Receiver id = {}: Handling exit: Delete writing file {} error.",
+            receiverId.get(),
+            writingFile.getPath(),
+            e);
       }
       writingFile = null;
     } else {
-      LOGGER.info("IoTDBFileReceiverV1#handleExit: writing file is null. No 
need to delete.");
+      if (LOGGER.isDebugEnabled()) {
+        LOGGER.debug(
+            "Receiver id = {}: Handling exit: Writing file is null. No need to 
delete.",
+            receiverId.get());
+      }
     }
 
     // Clear the original receiver file dir if exists
     if (receiverFileDirWithIdSuffix.get() != null) {
       if (receiverFileDirWithIdSuffix.get().exists()) {
         try {
-          Files.delete(receiverFileDirWithIdSuffix.get().toPath());
+          FileUtils.deleteDirectory(receiverFileDirWithIdSuffix.get());
           LOGGER.info(
-              "IoTDBFileReceiverV1#handleExit: original receiver file dir {} 
was deleted.",
+              "Receiver id = {}: Handling exit: Original receiver file dir {} 
was deleted.",
+              receiverId.get(),
               receiverFileDirWithIdSuffix.get().getPath());
         } catch (IOException e) {
           LOGGER.warn(
-              "IoTDBFileReceiverV1#handleExit: delete original receiver file 
dir {} error.",
-              receiverFileDirWithIdSuffix.get().getPath());
+              "Receiver id = {}: Handling exit: Delete original receiver file 
dir {} error.",
+              receiverId.get(),
+              receiverFileDirWithIdSuffix.get().getPath(),
+              e);
         }
       } else {
-        LOGGER.info(
-            "IoTDBFileReceiverV1#handleExit: original receiver file dir {} 
does not exist. No need to delete.",
-            receiverFileDirWithIdSuffix.get().getPath());
+        if (LOGGER.isDebugEnabled()) {
+          LOGGER.debug(
+              "Receiver id = {}: Handling exit: Original receiver file dir {} 
does not exist. No need to delete.",
+              receiverId.get(),
+              receiverFileDirWithIdSuffix.get().getPath());
+        }
       }
       receiverFileDirWithIdSuffix.set(null);
     } else {
-      LOGGER.info(
-          "IoTDBFileReceiverV1#handleExit: original receiver file dir is null. 
No need to delete.");
+      if (LOGGER.isDebugEnabled()) {
+        LOGGER.debug(
+            "Receiver id = {}: Handling exit: Original receiver file dir is 
null. No need to delete.",
+            receiverId.get());
+      }
     }
 
-    LOGGER.info("IoTDBFileReceiverV1#handleExit: receiver exited.");
+    LOGGER.info("Receiver id = {}: Handling exit: Receiver exited.", 
receiverId.get());
   }
 }

Reply via email to