This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch rel/1.2
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/rel/1.2 by this push:
     new a37a18fe885 Pipe: fix bugs & smells reported by SonarCloud 
(org.apache.iotdb.db.pipe) (#10268) (#10271)
a37a18fe885 is described below

commit a37a18fe8853ee351e0b4a3e6ea53be5540e3538
Author: Steve Yurong Su <[email protected]>
AuthorDate: Thu Jun 22 13:44:22 2023 +0800

    Pipe: fix bugs & smells reported by SonarCloud (org.apache.iotdb.db.pipe) 
(#10268) (#10271)
    
    (cherry picked from commit 4e8cb333130ed4ebd8c1e1a0fdf7eddda18539a1)
---
 .../db/pipe/agent/plugin/PipePluginAgent.java      |  11 +-
 .../db/pipe/agent/runtime/PipeRuntimeAgent.java    |   6 +-
 .../SimpleConsensusProgressIndexAssigner.java      |   5 +-
 .../iotdb/db/pipe/agent/task/PipeTaskAgent.java    | 135 +++++++++++----------
 .../pipe/collector/IoTDBDataRegionCollector.java   |  15 ++-
 .../PipeHistoricalDataRegionCollector.java         |   4 +-
 .../PipeHistoricalDataRegionTsFileCollector.java   |   8 +-
 .../realtime/PipeRealtimeDataRegionCollector.java  |   2 +-
 .../PipeRealtimeDataRegionFakeCollector.java       |  21 +++-
 .../PipeRealtimeDataRegionHybridCollector.java     |  27 +++--
 .../PipeRealtimeDataRegionLogCollector.java        |   7 +-
 .../PipeRealtimeDataRegionTsFileCollector.java     |   7 +-
 .../realtime/epoch/TsFileEpochManager.java         |  11 +-
 .../matcher/CachedSchemaPatternMatcher.java        |   4 +-
 .../pipe/connector/legacy/IoTDBSyncConnector.java  |  56 ++++-----
 .../pipe/connector/legacy/IoTDBSyncReceiver.java   |  37 ++----
 .../legacy/pipedata/DeletionPipeData.java          |   8 +-
 .../pipe/connector/legacy/pipedata/PipeData.java   |   5 +-
 .../connector/legacy/pipedata/TsFilePipeData.java  |   3 +-
 .../pipe/connector/v1/IoTDBThriftConnectorV1.java  |  29 ++---
 .../pipe/connector/v1/IoTDBThriftReceiverV1.java   |  25 ++--
 .../db/pipe/connector/v1/PipeRequestType.java      |   5 +-
 .../PipeTransferTabletInsertionEventHandler.java   |   4 +-
 .../PipeTransferTsFileInsertionEventHandler.java   |   2 +-
 .../apache/iotdb/db/pipe/event/EnrichedEvent.java  |   2 +-
 .../tablet/PipeInsertNodeTabletInsertionEvent.java |   3 -
 .../common/tablet/PipeRawTabletInsertionEvent.java |  10 +-
 .../tablet/TabletInsertionDataContainer.java       |  27 ++---
 .../common/tsfile/PipeTsFileInsertionEvent.java    |   4 +-
 .../tsfile/TsFileInsertionDataContainer.java       |   7 +-
 .../executor/PipeSubtaskExecutorManager.java       |   6 +-
 .../resource/file/PipeFileResourceManager.java     |  22 ++--
 .../db/pipe/resource/wal/PipeWALResource.java      |  10 +-
 .../pipe/resource/wal/PipeWALResourceManager.java  |  79 ++++--------
 .../db/pipe/task/connection/EventSupplier.java     |   1 +
 .../db/pipe/task/stage/PipeTaskCollectorStage.java |   3 +-
 .../db/pipe/task/stage/PipeTaskConnectorStage.java |   4 +-
 .../iotdb/db/pipe/task/stage/PipeTaskStage.java    |  25 ++--
 .../db/pipe/task/subtask/PipeConnectorSubtask.java |  42 +++++--
 .../task/subtask/PipeConnectorSubtaskManager.java  |  13 +-
 .../db/pipe/task/subtask/PipeProcessorSubtask.java |   1 -
 .../iotdb/db/pipe/task/subtask/PipeSubtask.java    |  46 +++++--
 42 files changed, 381 insertions(+), 361 deletions(-)

diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/PipePluginAgent.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/PipePluginAgent.java
index 9266bdf9a22..5a05cb43b53 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/PipePluginAgent.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/PipePluginAgent.java
@@ -137,7 +137,8 @@ public class PipePluginAgent {
       updateAllRegisteredClasses(currentActiveClassLoader);
 
       final Class<?> pluginClass = Class.forName(className, true, 
currentActiveClassLoader);
-      // ensure that it is a PipePlugin class
+
+      @SuppressWarnings("unused") // ensure that it is a PipePlugin class
       final PipePlugin ignored = (PipePlugin) 
pluginClass.getDeclaredConstructor().newInstance();
 
       pipePluginMetaKeeper.addPipePluginMeta(pluginName, pipePluginMeta);
@@ -165,7 +166,7 @@ public class PipePluginAgent {
     }
   }
 
-  public void deregister(String pluginName, boolean needToDeleteJar) throws 
Exception {
+  public void deregister(String pluginName, boolean needToDeleteJar) throws 
PipeException {
     acquireLock();
     try {
       final PipePluginMeta information = 
pipePluginMetaKeeper.getPipePluginMeta(pluginName);
@@ -187,6 +188,8 @@ public class PipePluginAgent {
         PipePluginExecutableManager.getInstance()
             .removeFileUnderTemporaryRoot(pluginName.toUpperCase() + ".txt");
       }
+    } catch (IOException e) {
+      throw new PipeException(e.getMessage(), e);
     } finally {
       releaseLock();
     }
@@ -225,7 +228,7 @@ public class PipePluginAgent {
               "Failed to reflect PipePlugin instance, because PipePlugin %s 
has not been registered.",
               pluginName.toUpperCase());
       LOGGER.warn(errorMessage);
-      throw new RuntimeException(errorMessage);
+      throw new PipeException(errorMessage);
     }
 
     try {
@@ -240,7 +243,7 @@ public class PipePluginAgent {
               "Failed to reflect PipePlugin %s(%s) instance, because %s",
               pluginName, information.getClassName(), e);
       LOGGER.warn(errorMessage, e);
-      throw new RuntimeException(errorMessage);
+      throw new PipeException(errorMessage);
     }
   }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeRuntimeAgent.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeRuntimeAgent.java
index 7d1a6336a16..afc899854d0 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeRuntimeAgent.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeRuntimeAgent.java
@@ -102,9 +102,9 @@ public class PipeRuntimeAgent implements IService {
 
   public void report(PipeTaskMeta pipeTaskMeta, PipeRuntimeException 
pipeRuntimeException) {
     LOGGER.warn(
-        String.format(
-            "PipeRuntimeException: pipe task meta %s, exception %s",
-            pipeTaskMeta, pipeRuntimeException),
+        "Report PipeRuntimeException to local PipeTaskMeta({}), exception 
message: {}",
+        pipeTaskMeta,
+        pipeRuntimeException.getMessage(),
         pipeRuntimeException);
     pipeTaskMeta.trackExceptionMessage(pipeRuntimeException);
   }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/SimpleConsensusProgressIndexAssigner.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/SimpleConsensusProgressIndexAssigner.java
index 7c81cd6cabd..b0a3eab9166 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/SimpleConsensusProgressIndexAssigner.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/SimpleConsensusProgressIndexAssigner.java
@@ -32,6 +32,7 @@ import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.io.IOException;
+import java.nio.charset.StandardCharsets;
 import java.util.concurrent.atomic.AtomicLong;
 
 import static org.apache.iotdb.consensus.ConsensusFactory.SIMPLE_CONSENSUS;
@@ -84,7 +85,7 @@ public class SimpleConsensusProgressIndexAssigner {
       return;
     }
     try {
-      String content = FileUtils.readFileToString(file, "UTF-8");
+      String content = FileUtils.readFileToString(file, 
StandardCharsets.UTF_8);
       rebootTimes = Integer.parseInt(content);
     } catch (IOException e) {
       LOGGER.error("Cannot parse reboot times from file {}", 
file.getAbsolutePath(), e);
@@ -94,7 +95,7 @@ public class SimpleConsensusProgressIndexAssigner {
 
   private void recordRebootTimes() throws IOException {
     File file = SystemFileFactory.INSTANCE.getFile(PIPE_SYSTEM_DIR + 
REBOOT_TIMES_FILE_NAME);
-    FileUtils.writeStringToFile(file, String.valueOf(rebootTimes + 1), 
"UTF-8");
+    FileUtils.writeStringToFile(file, String.valueOf(rebootTimes + 1), 
StandardCharsets.UTF_8);
   }
 
   public void assignIfNeeded(TsFileResource tsFileResource) {
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskAgent.java 
b/server/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskAgent.java
index e3fe56a52ec..bbbbd3a7d23 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskAgent.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskAgent.java
@@ -71,6 +71,9 @@ public class PipeTaskAgent {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(PipeTaskAgent.class);
   private static final IoTDBConfig CONFIG = 
IoTDBDescriptor.getInstance().getConfig();
 
+  private static final String MESSAGE_UNKNOWN_PIPE_STATUS = "Unknown pipe 
status %s for pipe %s";
+  private static final String MESSAGE_UNEXPECTED_PIPE_STATUS = "Unexpected 
pipe status %s: ";
+
   private final PipeMetaKeeper pipeMetaKeeper;
   private final PipeTaskManager pipeTaskManager;
 
@@ -238,8 +241,7 @@ public class PipeTaskAgent {
         } else {
           throw new IllegalStateException(
               String.format(
-                  "Unknown pipe status %s for pipe %s",
-                  statusOnDataNode, pipeStaticMeta.getPipeName()));
+                  MESSAGE_UNKNOWN_PIPE_STATUS, statusOnDataNode, 
pipeStaticMeta.getPipeName()));
         }
         break;
       case STOPPED:
@@ -248,8 +250,7 @@ public class PipeTaskAgent {
         } else {
           throw new IllegalStateException(
               String.format(
-                  "Unknown pipe status %s for pipe %s",
-                  statusOnDataNode, pipeStaticMeta.getPipeName()));
+                  MESSAGE_UNKNOWN_PIPE_STATUS, statusOnDataNode, 
pipeStaticMeta.getPipeName()));
         }
         break;
       case DROPPED:
@@ -259,8 +260,7 @@ public class PipeTaskAgent {
       default:
         throw new IllegalStateException(
             String.format(
-                "Unknown pipe status %s for pipe %s",
-                statusFromConfigNode, pipeStaticMeta.getPipeName()));
+                MESSAGE_UNKNOWN_PIPE_STATUS, statusFromConfigNode, 
pipeStaticMeta.getPipeName()));
     }
   }
 
@@ -296,27 +296,33 @@ public class PipeTaskAgent {
     final PipeMeta existedPipeMeta = pipeMetaKeeper.getPipeMeta(pipeName);
     if (existedPipeMeta != null) {
       if (existedPipeMeta.getStaticMeta().getCreationTime() == creationTime) {
-        switch (existedPipeMeta.getRuntimeMeta().getStatus().get()) {
+        final PipeStatus status = 
existedPipeMeta.getRuntimeMeta().getStatus().get();
+        switch (status) {
           case STOPPED:
           case RUNNING:
-            LOGGER.info(
-                "Pipe {} (creation time = {}) has already been created. 
Current status = {}. Skip creating.",
-                pipeName,
-                creationTime,
-                existedPipeMeta.getRuntimeMeta().getStatus().get().name());
+            if (LOGGER.isInfoEnabled()) {
+              LOGGER.info(
+                  "Pipe {} (creation time = {}) has already been created. 
Current status = {}. Skip creating.",
+                  pipeName,
+                  creationTime,
+                  status.name());
+            }
             return false;
           case DROPPED:
-            LOGGER.info(
-                "Pipe {} (creation time = {}) has already been dropped, but 
the pipe task meta has not been cleaned up. "
-                    + "Current status = {}. Try dropping the pipe and 
recreating it.",
-                pipeName,
-                creationTime,
-                existedPipeMeta.getRuntimeMeta().getStatus().get().name());
+            if (LOGGER.isInfoEnabled()) {
+              LOGGER.info(
+                  "Pipe {} (creation time = {}) has already been dropped, but 
the pipe task meta has not been cleaned up. "
+                      + "Current status = {}. Try dropping the pipe and 
recreating it.",
+                  pipeName,
+                  creationTime,
+                  status.name());
+            }
             // break to drop the pipe and recreate it
             break;
           default:
             throw new IllegalStateException(
-                "Unexpected status: " + 
existedPipeMeta.getRuntimeMeta().getStatus().get().name());
+                MESSAGE_UNEXPECTED_PIPE_STATUS
+                    + 
existedPipeMeta.getRuntimeMeta().getStatus().get().name());
         }
       }
 
@@ -440,31 +446,39 @@ public class PipeTaskAgent {
       return;
     }
 
-    switch (existedPipeMeta.getRuntimeMeta().getStatus().get()) {
+    final PipeStatus status = 
existedPipeMeta.getRuntimeMeta().getStatus().get();
+    switch (status) {
       case STOPPED:
-        LOGGER.info(
-            "Pipe {} (creation time = {}) has been created. Current status = 
{}. Starting.",
-            pipeName,
-            creationTime,
-            existedPipeMeta.getRuntimeMeta().getStatus().get().name());
+        if (LOGGER.isInfoEnabled()) {
+          LOGGER.info(
+              "Pipe {} (creation time = {}) has been created. Current status = 
{}. Starting.",
+              pipeName,
+              creationTime,
+              status.name());
+        }
         break;
       case RUNNING:
-        LOGGER.info(
-            "Pipe {} (creation time = {}) has already been started. Current 
status = {}. Skip starting.",
-            pipeName,
-            creationTime,
-            existedPipeMeta.getRuntimeMeta().getStatus().get().name());
+        if (LOGGER.isInfoEnabled()) {
+          LOGGER.info(
+              "Pipe {} (creation time = {}) has already been started. Current 
status = {}. Skip starting.",
+              pipeName,
+              creationTime,
+              status.name());
+        }
         return;
       case DROPPED:
-        LOGGER.info(
-            "Pipe {} (creation time = {}) has already been dropped. Current 
status = {}. Skip starting.",
-            pipeName,
-            creationTime,
-            existedPipeMeta.getRuntimeMeta().getStatus().get().name());
+        if (LOGGER.isInfoEnabled()) {
+          LOGGER.info(
+              "Pipe {} (creation time = {}) has already been dropped. Current 
status = {}. Skip starting.",
+              pipeName,
+              creationTime,
+              status.name());
+        }
         return;
       default:
         throw new IllegalStateException(
-            "Unexpected status: " + 
existedPipeMeta.getRuntimeMeta().getStatus().get().name());
+            MESSAGE_UNEXPECTED_PIPE_STATUS
+                + existedPipeMeta.getRuntimeMeta().getStatus().get().name());
     }
 
     // trigger start() method for each pipe task
@@ -510,31 +524,37 @@ public class PipeTaskAgent {
       return;
     }
 
-    switch (existedPipeMeta.getRuntimeMeta().getStatus().get()) {
+    final PipeStatus status = 
existedPipeMeta.getRuntimeMeta().getStatus().get();
+    switch (status) {
       case STOPPED:
-        LOGGER.info(
-            "Pipe {} (creation time = {}) has already been stopped. Current 
status = {}. Skip stopping.",
-            pipeName,
-            creationTime,
-            existedPipeMeta.getRuntimeMeta().getStatus().get().name());
+        if (LOGGER.isInfoEnabled()) {
+          LOGGER.info(
+              "Pipe {} (creation time = {}) has already been stopped. Current 
status = {}. Skip stopping.",
+              pipeName,
+              creationTime,
+              status.name());
+        }
         return;
       case RUNNING:
-        LOGGER.info(
-            "Pipe {} (creation time = {}) has been started. Current status = 
{}. Stopping.",
-            pipeName,
-            creationTime,
-            existedPipeMeta.getRuntimeMeta().getStatus().get().name());
+        if (LOGGER.isInfoEnabled()) {
+          LOGGER.info(
+              "Pipe {} (creation time = {}) has been started. Current status = 
{}. Stopping.",
+              pipeName,
+              creationTime,
+              status.name());
+        }
         break;
       case DROPPED:
-        LOGGER.info(
-            "Pipe {} (creation time = {}) has already been dropped. Current 
status = {}. Skip stopping.",
-            pipeName,
-            creationTime,
-            existedPipeMeta.getRuntimeMeta().getStatus().get().name());
+        if (LOGGER.isInfoEnabled()) {
+          LOGGER.info(
+              "Pipe {} (creation time = {}) has already been dropped. Current 
status = {}. Skip stopping.",
+              pipeName,
+              creationTime,
+              status.name());
+        }
         return;
       default:
-        throw new IllegalStateException(
-            "Unexpected status: " + 
existedPipeMeta.getRuntimeMeta().getStatus().get().name());
+        throw new IllegalStateException(MESSAGE_UNEXPECTED_PIPE_STATUS + 
status.name());
     }
 
     // trigger stop() method for each pipe task
@@ -593,13 +613,6 @@ public class PipeTaskAgent {
     }
   }
 
-  private void stopPipeTask(TConsensusGroupId dataRegionGroupId, 
PipeStaticMeta pipeStaticMeta) {
-    final PipeTask pipeTask = pipeTaskManager.getPipeTask(pipeStaticMeta, 
dataRegionGroupId);
-    if (pipeTask != null) {
-      pipeTask.stop();
-    }
-  }
-
   ///////////////////////// Heartbeat /////////////////////////
 
   public synchronized void collectPipeMetaList(THeartbeatReq req, 
THeartbeatResp resp)
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/collector/IoTDBDataRegionCollector.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/collector/IoTDBDataRegionCollector.java
index d1653b30c59..cbe7fee083e 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/collector/IoTDBDataRegionCollector.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/collector/IoTDBDataRegionCollector.java
@@ -90,14 +90,14 @@ public class IoTDBDataRegionCollector implements 
PipeCollector {
           COLLECTOR_REALTIME_MODE_LOG);
     }
 
-    constructHistoricalCollector(validator.getParameters());
+    constructHistoricalCollector();
     constructRealtimeCollector(validator.getParameters());
 
     historicalCollector.validate(validator);
     realtimeCollector.validate(validator);
   }
 
-  private void constructHistoricalCollector(PipeParameters parameters) {
+  private void constructHistoricalCollector() {
     // enable historical collector by default
     historicalCollector = new PipeHistoricalDataRegionTsFileCollector();
   }
@@ -128,9 +128,8 @@ public class IoTDBDataRegionCollector implements 
PipeCollector {
       default:
         realtimeCollector = new PipeRealtimeDataRegionHybridCollector();
         LOGGER.warn(
-            String.format(
-                "Unsupported collector realtime mode: %s, create a hybrid 
collector.",
-                parameters.getString(COLLECTOR_REALTIME_MODE)));
+            "Unsupported collector realtime mode: {}, create a hybrid 
collector.",
+            parameters.getString(COLLECTOR_REALTIME_MODE));
     }
   }
 
@@ -152,7 +151,7 @@ public class IoTDBDataRegionCollector implements 
PipeCollector {
     hasBeenStarted.set(true);
 
     final AtomicReference<Exception> exceptionHolder = new 
AtomicReference<>(null);
-    final DataRegionId dataRegionId = new DataRegionId(this.dataRegionId);
+    final DataRegionId dataRegionIdObject = new 
DataRegionId(this.dataRegionId);
     while (true) {
       // try to start collectors in the data region ...
       // first try to run if data region exists, then try to run if data 
region does not exist.
@@ -160,7 +159,7 @@ public class IoTDBDataRegionCollector implements 
PipeCollector {
       // runIfPresent and runIfAbsent operations. in this case, we need to 
retry.
       if (StorageEngine.getInstance()
               .runIfPresent(
-                  dataRegionId,
+                  dataRegionIdObject,
                   (dataRegion -> {
                     dataRegion.writeLock(
                         String.format(
@@ -173,7 +172,7 @@ public class IoTDBDataRegionCollector implements 
PipeCollector {
                   }))
           || StorageEngine.getInstance()
               .runIfAbsent(
-                  dataRegionId,
+                  dataRegionIdObject,
                   () -> 
startHistoricalCollectorAndRealtimeCollector(exceptionHolder))) {
         rethrowExceptionIfAny(exceptionHolder);
         return;
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/collector/historical/PipeHistoricalDataRegionCollector.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/collector/historical/PipeHistoricalDataRegionCollector.java
index beb96f8f26b..08d5684851e 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/collector/historical/PipeHistoricalDataRegionCollector.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/collector/historical/PipeHistoricalDataRegionCollector.java
@@ -21,7 +21,7 @@ package org.apache.iotdb.db.pipe.collector.historical;
 
 import org.apache.iotdb.pipe.api.PipeCollector;
 
-public abstract class PipeHistoricalDataRegionCollector implements 
PipeCollector {
+public interface PipeHistoricalDataRegionCollector extends PipeCollector {
 
-  public abstract boolean hasConsumedAll();
+  boolean hasConsumedAll();
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/collector/historical/PipeHistoricalDataRegionTsFileCollector.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/collector/historical/PipeHistoricalDataRegionTsFileCollector.java
index 7d4bc25f2dc..6ade5ca4ad8 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/collector/historical/PipeHistoricalDataRegionTsFileCollector.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/collector/historical/PipeHistoricalDataRegionTsFileCollector.java
@@ -50,7 +50,7 @@ import static 
org.apache.iotdb.db.pipe.config.constant.PipeCollectorConstant.COL
 import static 
org.apache.iotdb.db.pipe.config.constant.PipeCollectorConstant.COLLECTOR_PATTERN_DEFAULT_VALUE;
 import static 
org.apache.iotdb.db.pipe.config.constant.PipeCollectorConstant.COLLECTOR_PATTERN_KEY;
 
-public class PipeHistoricalDataRegionTsFileCollector extends 
PipeHistoricalDataRegionCollector {
+public class PipeHistoricalDataRegionTsFileCollector implements 
PipeHistoricalDataRegionCollector {
 
   private static final Logger LOGGER =
       LoggerFactory.getLogger(PipeHistoricalDataRegionTsFileCollector.class);
@@ -69,10 +69,10 @@ public class PipeHistoricalDataRegionTsFileCollector 
extends PipeHistoricalDataR
 
   private Queue<PipeTsFileInsertionEvent> pendingQueue;
 
-  public PipeHistoricalDataRegionTsFileCollector() {}
-
   @Override
-  public void validate(PipeParameterValidator validator) {}
+  public void validate(PipeParameterValidator validator) {
+    // do nothing
+  }
 
   @Override
   public void customize(
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/PipeRealtimeDataRegionCollector.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/PipeRealtimeDataRegionCollector.java
index ab9be42b062..4e6e98c168e 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/PipeRealtimeDataRegionCollector.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/PipeRealtimeDataRegionCollector.java
@@ -35,7 +35,7 @@ public abstract class PipeRealtimeDataRegionCollector 
implements PipeCollector {
   protected String dataRegionId;
   protected PipeTaskMeta pipeTaskMeta;
 
-  public PipeRealtimeDataRegionCollector() {}
+  protected PipeRealtimeDataRegionCollector() {}
 
   @Override
   public void validate(PipeParameterValidator validator) throws Exception {}
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/PipeRealtimeDataRegionFakeCollector.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/PipeRealtimeDataRegionFakeCollector.java
index fbc28c4dbca..6051930e4e9 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/PipeRealtimeDataRegionFakeCollector.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/PipeRealtimeDataRegionFakeCollector.java
@@ -26,15 +26,22 @@ import 
org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
 import org.apache.iotdb.pipe.api.event.Event;
 
 public class PipeRealtimeDataRegionFakeCollector extends 
PipeRealtimeDataRegionCollector {
+
   @Override
-  public void validate(PipeParameterValidator validator) {}
+  public void validate(PipeParameterValidator validator) {
+    // do nothing
+  }
 
   @Override
   public void customize(
-      PipeParameters parameters, PipeCollectorRuntimeConfiguration 
configuration) {}
+      PipeParameters parameters, PipeCollectorRuntimeConfiguration 
configuration) {
+    // do nothing
+  }
 
   @Override
-  public void start() {}
+  public void start() {
+    // do nothing
+  }
 
   @Override
   public Event supply() {
@@ -42,7 +49,9 @@ public class PipeRealtimeDataRegionFakeCollector extends 
PipeRealtimeDataRegionC
   }
 
   @Override
-  public void collect(PipeRealtimeCollectEvent event) {}
+  public void collect(PipeRealtimeCollectEvent event) {
+    // do nothing
+  }
 
   @Override
   public boolean isNeedListenToTsFile() {
@@ -55,7 +64,9 @@ public class PipeRealtimeDataRegionFakeCollector extends 
PipeRealtimeDataRegionC
   }
 
   @Override
-  public void close() {}
+  public void close() {
+    // do nothing
+  }
 
   @Override
   public String toString() {
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/PipeRealtimeDataRegionHybridCollector.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/PipeRealtimeDataRegionHybridCollector.java
index 1551fc2a582..eeca9a82dad 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/PipeRealtimeDataRegionHybridCollector.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/PipeRealtimeDataRegionHybridCollector.java
@@ -80,15 +80,15 @@ public class PipeRealtimeDataRegionHybridCollector extends 
PipeRealtimeDataRegio
       return;
     }
 
-    if 
(!event.getTsFileEpoch().getState(this).equals(TsFileEpoch.State.USING_TSFILE)) 
{
-      if (!pendingQueue.offer(event)) {
-        LOGGER.warn(
-            String.format(
-                "collectTabletInsertion: pending queue of 
PipeRealtimeDataRegionHybridCollector %s has reached capacity, discard tablet 
event %s, current state %s",
-                this, event, event.getTsFileEpoch().getState(this)));
-        // this would not happen, but just in case.
-        // UnboundedBlockingPendingQueue is unbounded, so it should never 
reach capacity.
-      }
+    if 
(!event.getTsFileEpoch().getState(this).equals(TsFileEpoch.State.USING_TSFILE)
+        && !pendingQueue.offer(event)) {
+      LOGGER.warn(
+          "collectTabletInsertion: pending queue of 
PipeRealtimeDataRegionHybridCollector {} has reached capacity, discard tablet 
event {}, current state {}",
+          this,
+          event,
+          event.getTsFileEpoch().getState(this));
+      // this would not happen, but just in case.
+      // UnboundedBlockingPendingQueue is unbounded, so it should never reach 
capacity.
     }
   }
 
@@ -102,9 +102,10 @@ public class PipeRealtimeDataRegionHybridCollector extends 
PipeRealtimeDataRegio
 
     if (!pendingQueue.offer(event)) {
       LOGGER.warn(
-          String.format(
-              "collectTsFileInsertion: pending queue of 
PipeRealtimeDataRegionHybridCollector %s has reached capacity, discard TsFile 
event %s, current state %s",
-              this, event, event.getTsFileEpoch().getState(this)));
+          "collectTsFileInsertion: pending queue of 
PipeRealtimeDataRegionHybridCollector {} has reached capacity, discard TsFile 
event {}, current state {}",
+          this,
+          event,
+          event.getTsFileEpoch().getState(this));
       // this would not happen, but just in case.
       // ListenableUnblockingPendingQueue is unbounded, so it should never 
reach capacity.
     }
@@ -163,7 +164,7 @@ public class PipeRealtimeDataRegionHybridCollector extends 
PipeRealtimeDataRegio
         // this event is not reliable anymore. but the data represented by 
this event
         // has been carried by the following tsfile event, so we can just 
discard this event.
         event.getTsFileEpoch().migrateState(this, state -> 
TsFileEpoch.State.USING_TSFILE);
-        LOGGER.warn(String.format("Increase reference count for event %s 
error.", event));
+        LOGGER.warn("Increase reference count for event {} error.", event);
         return null;
       }
     }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/PipeRealtimeDataRegionLogCollector.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/PipeRealtimeDataRegionLogCollector.java
index 42da6a5d6fc..2d02d4fba78 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/PipeRealtimeDataRegionLogCollector.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/PipeRealtimeDataRegionLogCollector.java
@@ -53,9 +53,10 @@ public class PipeRealtimeDataRegionLogCollector extends 
PipeRealtimeDataRegionCo
 
     if (!pendingQueue.offer(event)) {
       LOGGER.warn(
-          String.format(
-              "collect: pending queue of PipeRealtimeDataRegionLogCollector %s 
has reached capacity, discard tablet event %s, current state %s",
-              this, event, event.getTsFileEpoch().getState(this)));
+          "collect: pending queue of PipeRealtimeDataRegionLogCollector {} has 
reached capacity, discard tablet event {}, current state {}",
+          this,
+          event,
+          event.getTsFileEpoch().getState(this));
       // this would not happen, but just in case.
       // ListenableUnblockingPendingQueue is unbounded, so it should never 
reach capacity.
     }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/PipeRealtimeDataRegionTsFileCollector.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/PipeRealtimeDataRegionTsFileCollector.java
index cd56caae708..f43582a5541 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/PipeRealtimeDataRegionTsFileCollector.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/PipeRealtimeDataRegionTsFileCollector.java
@@ -53,9 +53,10 @@ public class PipeRealtimeDataRegionTsFileCollector extends 
PipeRealtimeDataRegio
 
     if (!pendingQueue.offer(event)) {
       LOGGER.warn(
-          String.format(
-              "collect: pending queue of PipeRealtimeDataRegionTsFileCollector 
%s has reached capacity, discard TsFile event %s, current state %s",
-              this, event, event.getTsFileEpoch().getState(this)));
+          "collect: pending queue of PipeRealtimeDataRegionTsFileCollector {} 
has reached capacity, discard TsFile event {}, current state {}",
+          this,
+          event,
+          event.getTsFileEpoch().getState(this));
       // this would not happen, but just in case.
       // ListenableUnblockingPendingQueue is unbounded, so it should never 
reach capacity.
     }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/epoch/TsFileEpochManager.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/epoch/TsFileEpochManager.java
index 095bac51c2b..dbecc2f2a72 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/epoch/TsFileEpochManager.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/epoch/TsFileEpochManager.java
@@ -46,11 +46,12 @@ public class TsFileEpochManager {
     final String filePath = resource.getTsFilePath();
 
     // this would not happen, but just in case
-    if (!filePath2Epoch.containsKey(filePath)) {
-      LOGGER.info(
-          String.format("Pipe: can not find TsFileEpoch for TsFile %s, 
creating it", filePath));
-      filePath2Epoch.put(filePath, new TsFileEpoch(filePath));
-    }
+    filePath2Epoch.computeIfAbsent(
+        filePath,
+        path -> {
+          LOGGER.info("TsFileEpoch not found for TsFile {}, creating a new 
one", path);
+          return new TsFileEpoch(path);
+        });
 
     return new PipeRealtimeCollectEvent(
         event,
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/matcher/CachedSchemaPatternMatcher.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/matcher/CachedSchemaPatternMatcher.java
index cc3dd94a67c..14df36e9267 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/matcher/CachedSchemaPatternMatcher.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/collector/realtime/matcher/CachedSchemaPatternMatcher.java
@@ -84,8 +84,6 @@ public class CachedSchemaPatternMatcher implements 
PipeDataRegionMatcher {
     }
   }
 
-  // TODO: maximum the efficiency of matching when pattern is root
-  // TODO: memory control
   @Override
   public Set<PipeRealtimeDataRegionCollector> match(PipeRealtimeCollectEvent 
event) {
     final Set<PipeRealtimeDataRegionCollector> matchedCollectors = new 
HashSet<>();
@@ -105,7 +103,7 @@ public class CachedSchemaPatternMatcher implements 
PipeDataRegionMatcher {
             deviceToCollectorsCache.get(device, 
this::filterCollectorsByDevice);
         // this would not happen
         if (collectorsFilteredByDevice == null) {
-          LOGGER.warn(String.format("Match result NPE when handle device %s", 
device));
+          LOGGER.warn("Match result NPE when handle device {}", device);
           continue;
         }
 
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/connector/legacy/IoTDBSyncConnector.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/connector/legacy/IoTDBSyncConnector.java
index ecb03c87a7f..eda9002a226 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/connector/legacy/IoTDBSyncConnector.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/connector/legacy/IoTDBSyncConnector.java
@@ -86,7 +86,7 @@ public class IoTDBSyncConnector implements PipeConnector {
 
   private IoTDBThriftConnectorClient client;
 
-  private static SessionPool sessionPool;
+  private SessionPool sessionPool;
 
   @Override
   public void validate(PipeParameterValidator validator) throws Exception {
@@ -143,8 +143,10 @@ public class IoTDBSyncConnector implements PipeConnector {
         throw new PipeRuntimeCriticalException(errorMsg);
       }
     } catch (TException e) {
-      LOGGER.warn(String.format("Connect to receiver %s:%s error.", ipAddress, 
port), e);
-      throw new PipeConnectionException(e.getMessage(), e);
+      throw new PipeConnectionException(
+          String.format(
+              "Connect to receiver %s:%s error, because: %s", ipAddress, port, 
e.getMessage()),
+          e);
     }
 
     sessionPool =
@@ -158,27 +160,19 @@ public class IoTDBSyncConnector implements PipeConnector {
   }
 
   @Override
-  public void heartbeat() throws Exception {}
+  public void heartbeat() throws Exception {
+    // do nothing
+  }
 
   @Override
   public void transfer(TabletInsertionEvent tabletInsertionEvent) throws 
Exception {
-    try {
-      if (tabletInsertionEvent instanceof PipeInsertNodeTabletInsertionEvent) {
-        doTransfer((PipeInsertNodeTabletInsertionEvent) tabletInsertionEvent);
-      } else if (tabletInsertionEvent instanceof PipeRawTabletInsertionEvent) {
-        doTransfer((PipeRawTabletInsertionEvent) tabletInsertionEvent);
-      } else {
-        throw new NotImplementedException(
-            "IoTDBSyncConnector only support PipeInsertNodeInsertionEvent and 
PipeTabletInsertionEvent.");
-      }
-    } catch (TException e) {
-      LOGGER.warn(
-          "Network error when transfer tablet insertion event: {}.", 
tabletInsertionEvent, e);
-      // the connection may be broken, try to reconnect by catching 
PipeConnectionException
-      throw new PipeConnectionException(
-          String.format(
-              "Network error when transfer tablet insertion event, because 
%s.", e.getMessage()),
-          e);
+    if (tabletInsertionEvent instanceof PipeInsertNodeTabletInsertionEvent) {
+      doTransfer((PipeInsertNodeTabletInsertionEvent) tabletInsertionEvent);
+    } else if (tabletInsertionEvent instanceof PipeRawTabletInsertionEvent) {
+      doTransfer((PipeRawTabletInsertionEvent) tabletInsertionEvent);
+    } else {
+      throw new NotImplementedException(
+          "IoTDBSyncConnector only support PipeInsertNodeInsertionEvent and 
PipeTabletInsertionEvent.");
     }
   }
 
@@ -193,7 +187,7 @@ public class IoTDBSyncConnector implements PipeConnector {
   }
 
   private void doTransfer(PipeRawTabletInsertionEvent pipeTabletInsertionEvent)
-      throws PipeException, TException, IoTDBConnectionException, 
StatementExecutionException {
+      throws PipeException, IoTDBConnectionException, 
StatementExecutionException {
     final Tablet tablet = pipeTabletInsertionEvent.convertToTablet();
     if (pipeTabletInsertionEvent.isAligned()) {
       sessionPool.insertAlignedTablet(tablet);
@@ -212,10 +206,10 @@ public class IoTDBSyncConnector implements PipeConnector {
     try {
       doTransfer((PipeTsFileInsertionEvent) tsFileInsertionEvent);
     } catch (TException e) {
-      LOGGER.warn(
-          "Network error when transfer tsFile insertion event: {}.", 
tsFileInsertionEvent, e);
-      // The connection may be broken, try to reconnect by catching 
PipeConnectionException
-      throw new PipeConnectionException("Network error when transfer tsFile 
insertion event.", e);
+      throw new PipeConnectionException(
+          String.format(
+              "Network error when transfer tsFile insertion event: %s.", 
tsFileInsertionEvent),
+          e);
     }
   }
 
@@ -253,8 +247,7 @@ public class IoTDBSyncConnector implements PipeConnector {
         } else if (status.code == 
TSStatusCode.SYNC_FILE_REDIRECTION_ERROR.getStatusCode()) {
           position = Long.parseLong(status.message);
           randomAccessFile.seek(position);
-          LOGGER.info(
-              String.format("Redirect to position %s in transferring tsFile 
%s.", position, file));
+          LOGGER.info("Redirect to position {} in transferring tsFile {}.", 
position, file);
         } else if (status.code == 
TSStatusCode.SYNC_FILE_ERROR.getStatusCode()) {
           String errorMsg =
               String.format("Network failed to receive tsFile %s, status: %s", 
file, status);
@@ -263,8 +256,11 @@ public class IoTDBSyncConnector implements PipeConnector {
         }
       }
     } catch (TException e) {
-      LOGGER.warn(String.format("Cannot send pipe data to receiver %s:%s.", 
ipAddress, port), e);
-      throw new PipeConnectionException(e.getMessage(), e);
+      throw new PipeConnectionException(
+          String.format(
+              "Cannot send pipe data to receiver %s:%s, because: %s.",
+              ipAddress, port, e.getMessage()),
+          e);
     }
   }
 
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/connector/legacy/IoTDBSyncReceiver.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/connector/legacy/IoTDBSyncReceiver.java
index c2b651c1e62..ff6f9faad3e 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/connector/legacy/IoTDBSyncReceiver.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/connector/legacy/IoTDBSyncReceiver.java
@@ -105,12 +105,11 @@ public class IoTDBSyncReceiver {
       new File(getFileDataDir(identityInfo)).mkdirs();
     }
     createConnection(identityInfo);
-    if (!StringUtils.isEmpty(identityInfo.getDatabase())) {
-      if (!registerDatabase(identityInfo.getDatabase(), partitionFetcher, 
schemaFetcher)) {
-        return RpcUtils.getStatus(
-            TSStatusCode.PIPESERVER_ERROR,
-            String.format("Auto register database %s error.", 
identityInfo.getDatabase()));
-      }
+    if (!StringUtils.isEmpty(identityInfo.getDatabase())
+        && !registerDatabase(identityInfo.getDatabase(), partitionFetcher, 
schemaFetcher)) {
+      return RpcUtils.getStatus(
+          TSStatusCode.PIPESERVER_ERROR,
+          String.format("Auto register database %s error.", 
identityInfo.getDatabase()));
     }
     return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS, "");
   }
@@ -245,7 +244,9 @@ public class IoTDBSyncReceiver {
                 targetFile
                     .getName()
                     .substring(0, targetFile.getName().length() - 
PATCH_SUFFIX.length()));
-        targetFile.renameTo(newFile);
+        if (!targetFile.renameTo(newFile)) {
+          LOGGER.error("Fail to rename file {} to {}", targetFile, newFile);
+        }
       }
     }
     tsFilePipeData.setParentDirPath(dir.getAbsolutePath());
@@ -276,14 +277,9 @@ public class IoTDBSyncReceiver {
     File file = new File(fileDir, fileName + PATCH_SUFFIX);
 
     // step2. check startIndex
-    try {
-      IndexCheckResult result = checkStartIndexValid(new File(fileDir, 
fileName), startIndex);
-      if (!result.isResult()) {
-        return RpcUtils.getStatus(TSStatusCode.SYNC_FILE_REDIRECTION_ERROR, 
result.getIndex());
-      }
-    } catch (IOException e) {
-      LOGGER.error(e.getMessage());
-      return RpcUtils.getStatus(TSStatusCode.SYNC_FILE_ERROR, e.getMessage());
+    IndexCheckResult result = checkStartIndexValid(new File(fileDir, 
fileName), startIndex);
+    if (!result.isResult()) {
+      return RpcUtils.getStatus(TSStatusCode.SYNC_FILE_REDIRECTION_ERROR, 
result.getIndex());
     }
 
     // step3. append file
@@ -294,14 +290,7 @@ public class IoTDBSyncReceiver {
       buff.get(byteArray);
       randomAccessFile.write(byteArray);
       recordStartIndex(new File(fileDir, fileName), startIndex + length);
-      LOGGER.debug(
-          "Sync "
-              + fileName
-              + " start at "
-              + startIndex
-              + " to "
-              + (startIndex + length)
-              + " is done.");
+      LOGGER.debug("Sync {} start at {} to {} is done.", fileName, startIndex, 
startIndex + length);
     } catch (IOException e) {
       LOGGER.error(e.getMessage());
       return RpcUtils.getStatus(TSStatusCode.SYNC_FILE_ERROR, e.getMessage());
@@ -310,7 +299,7 @@ public class IoTDBSyncReceiver {
     return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS, "");
   }
 
-  private IndexCheckResult checkStartIndexValid(File file, long startIndex) 
throws IOException {
+  private IndexCheckResult checkStartIndexValid(File file, long startIndex) {
     // get local index from memory map
     long localIndex = getCurrentFileStartIndex(file.getAbsolutePath());
     // get local index from file
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/connector/legacy/pipedata/DeletionPipeData.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/connector/legacy/pipedata/DeletionPipeData.java
index a98d19b5356..0e72acd6f67 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/connector/legacy/pipedata/DeletionPipeData.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/connector/legacy/pipedata/DeletionPipeData.java
@@ -52,10 +52,14 @@ public class DeletionPipeData extends PipeData {
   }
 
   @Override
-  public void deserialize(DataInputStream stream) throws IOException, 
IllegalPathException {
+  public void deserialize(DataInputStream stream) throws IOException {
     super.deserialize(stream);
     database = ReadWriteIOUtils.readString(stream);
-    deletion = Deletion.deserializeWithoutFileOffset(stream);
+    try {
+      deletion = Deletion.deserializeWithoutFileOffset(stream);
+    } catch (IllegalPathException e) {
+      throw new IOException(e);
+    }
   }
 
   @Override
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/connector/legacy/pipedata/PipeData.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/connector/legacy/pipedata/PipeData.java
index 659c69876e9..86eca1c3585 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/connector/legacy/pipedata/PipeData.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/connector/legacy/pipedata/PipeData.java
@@ -64,12 +64,11 @@ public abstract class PipeData {
     return byteStream.toByteArray();
   }
 
-  public void deserialize(DataInputStream stream) throws IOException, 
IllegalPathException {
+  public void deserialize(DataInputStream stream) throws IOException {
     serialNumber = stream.readLong();
   }
 
-  public static PipeData createPipeData(DataInputStream stream)
-      throws IOException, IllegalPathException {
+  public static PipeData createPipeData(DataInputStream stream) throws 
IOException {
     PipeData pipeData;
     PipeDataType type = PipeDataType.getPipeDataType(stream.readByte());
     switch (type) {
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/connector/legacy/pipedata/TsFilePipeData.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/connector/legacy/pipedata/TsFilePipeData.java
index 803a5c67427..54f05eef81d 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/connector/legacy/pipedata/TsFilePipeData.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/connector/legacy/pipedata/TsFilePipeData.java
@@ -19,7 +19,6 @@
  */
 package org.apache.iotdb.db.pipe.connector.legacy.pipedata;
 
-import org.apache.iotdb.commons.exception.IllegalPathException;
 import org.apache.iotdb.db.pipe.connector.legacy.loader.ILoader;
 import org.apache.iotdb.db.pipe.connector.legacy.loader.TsFileLoader;
 import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
@@ -99,7 +98,7 @@ public class TsFilePipeData extends PipeData {
   }
 
   @Override
-  public void deserialize(DataInputStream stream) throws IOException, 
IllegalPathException {
+  public void deserialize(DataInputStream stream) throws IOException {
     super.deserialize(stream);
     parentDirPath = ReadWriteIOUtils.readString(stream);
     if (parentDirPath == null) {
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/connector/v1/IoTDBThriftConnectorV1.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/connector/v1/IoTDBThriftConnectorV1.java
index bb18acda62d..4160eaeb074 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/connector/v1/IoTDBThriftConnectorV1.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/connector/v1/IoTDBThriftConnectorV1.java
@@ -23,8 +23,6 @@ import 
org.apache.iotdb.commons.client.property.ThriftClientProperty;
 import org.apache.iotdb.commons.conf.CommonConfig;
 import org.apache.iotdb.commons.conf.CommonDescriptor;
 import org.apache.iotdb.commons.pipe.config.PipeConfig;
-import org.apache.iotdb.db.conf.IoTDBConfig;
-import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.pipe.connector.v1.reponse.PipeTransferFilePieceResp;
 import org.apache.iotdb.db.pipe.connector.v1.request.PipeTransferFilePieceReq;
 import org.apache.iotdb.db.pipe.connector.v1.request.PipeTransferFileSealReq;
@@ -65,15 +63,12 @@ public class IoTDBThriftConnectorV1 implements 
PipeConnector {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(IoTDBThriftConnectorV1.class);
 
   private static final CommonConfig COMMON_CONFIG = 
CommonDescriptor.getInstance().getConfig();
-  private static final IoTDBConfig IOTDB_CONFIG = 
IoTDBDescriptor.getInstance().getConfig();
 
   private String ipAddress;
   private int port;
 
   private IoTDBThriftConnectorClient client;
 
-  public IoTDBThriftConnectorV1() {}
-
   @Override
   public void validate(PipeParameterValidator validator) throws Exception {
     validator
@@ -112,13 +107,17 @@ public class IoTDBThriftConnectorV1 implements 
PipeConnector {
         throw new PipeException(String.format("Handshake error, result status 
%s.", resp.status));
       }
     } catch (TException e) {
-      LOGGER.warn(String.format("Connect to receiver %s:%s error.", ipAddress, 
port), e);
-      throw new PipeConnectionException(e.getMessage(), e);
+      throw new PipeConnectionException(
+          String.format(
+              "Connect to receiver %s:%s error, because: %s", ipAddress, port, 
e.getMessage()),
+          e);
     }
   }
 
   @Override
-  public void heartbeat() throws Exception {}
+  public void heartbeat() throws Exception {
+    // do nothing
+  }
 
   @Override
   public void transfer(TabletInsertionEvent tabletInsertionEvent) throws 
Exception {
@@ -133,12 +132,10 @@ public class IoTDBThriftConnectorV1 implements 
PipeConnector {
             "IoTDBThriftConnectorV1 only support 
PipeInsertNodeTabletInsertionEvent and PipeRawTabletInsertionEvent.");
       }
     } catch (TException e) {
-      LOGGER.warn(
-          "Network error when transfer tablet insertion event: {}.", 
tabletInsertionEvent, e);
-      // the connection may be broken, try to reconnect by catching 
PipeConnectionException
       throw new PipeConnectionException(
           String.format(
-              "Network error when transfer tablet insertion event, because 
%s.", e.getMessage()),
+              "Network error when transfer tablet insertion event %s, because 
%s.",
+              tabletInsertionEvent, e.getMessage()),
           e);
     }
   }
@@ -185,12 +182,10 @@ public class IoTDBThriftConnectorV1 implements 
PipeConnector {
     try {
       doTransfer((PipeTsFileInsertionEvent) tsFileInsertionEvent);
     } catch (TException e) {
-      LOGGER.warn(
-          "Network error when transfer tsfile insertion event: {}.", 
tsFileInsertionEvent, e);
-      // the connection may be broken, try to reconnect by catching 
PipeConnectionException
       throw new PipeConnectionException(
           String.format(
-              "Network error when transfer tsfile insertion event, because 
%s.", e.getMessage()),
+              "Network error when transfer tsfile insertion event %s, because 
%s.",
+              tsFileInsertionEvent, e.getMessage()),
           e);
     }
   }
@@ -229,7 +224,7 @@ public class IoTDBThriftConnectorV1 implements 
PipeConnector {
             == TSStatusCode.PIPE_TRANSFER_FILE_OFFSET_RESET.getStatusCode()) {
           position = resp.getEndWritingOffset();
           reader.seek(position);
-          LOGGER.info(String.format("Redirect file position to %s.", 
position));
+          LOGGER.info("Redirect file position to {}.", position);
           continue;
         }
 
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/connector/v1/IoTDBThriftReceiverV1.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/connector/v1/IoTDBThriftReceiverV1.java
index cc803c33b95..de1df42b73a 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/connector/v1/IoTDBThriftReceiverV1.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/connector/v1/IoTDBThriftReceiverV1.java
@@ -170,18 +170,25 @@ public class IoTDBThriftReceiverV1 implements 
IoTDBThriftReceiver {
       writingFileWriter = null;
     }
     if (writingFile != null && writingFile.exists()) {
-      final boolean ignored = writingFile.delete();
-      LOGGER.info(String.format("original file %s was deleted.", 
writingFile.getPath()));
+      if (writingFile.delete()) {
+        LOGGER.info("original file {} was deleted.", writingFile.getPath());
+      } else {
+        LOGGER.warn("failed to delete original file {}.", 
writingFile.getPath());
+      }
       writingFile = null;
     }
 
     final File receiveDir = new File(RECEIVER_FILE_DIR);
     if (!receiveDir.exists()) {
-      boolean ignored = receiveDir.mkdirs();
+      if (receiveDir.mkdirs()) {
+        LOGGER.info("receiver file dir {} was created.", receiveDir.getPath());
+      } else {
+        LOGGER.warn("failed to create receiver file dir {}.", 
receiveDir.getPath());
+      }
     }
     writingFile = new File(RECEIVER_FILE_DIR, fileName);
     writingFileWriter = new RandomAccessFile(writingFile, "rw");
-    LOGGER.info(String.format("start to write transferring file %s.", 
writingFile.getPath()));
+    LOGGER.info("start to write transferring file {}.", writingFile.getPath());
   }
 
   private boolean isFileExistedAndNameCorrect(String fileName) {
@@ -288,13 +295,9 @@ public class IoTDBThriftReceiverV1 implements 
IoTDBThriftReceiver {
       if (writingFileWriter != null) {
         writingFileWriter.close();
       }
-      if (writingFile != null) {
-        if (!writingFile.delete()) {
-          LOGGER.warn(
-              String.format(
-                  "IoTDBThriftReceiverV1#handleExit: delete file %s error.",
-                  writingFile.getPath()));
-        }
+      if (writingFile != null && !writingFile.delete()) {
+        LOGGER.warn(
+            "IoTDBThriftReceiverV1#handleExit: delete file {} error.", 
writingFile.getPath());
       }
     } catch (IOException e) {
       LOGGER.warn("IoTDBThriftReceiverV1#handleExit: meeting errors on 
handleExit().", e);
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/connector/v1/PipeRequestType.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/connector/v1/PipeRequestType.java
index cdbb5a59eef..b24ef0e9066 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/connector/v1/PipeRequestType.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/connector/v1/PipeRequestType.java
@@ -45,7 +45,10 @@ public enum PipeRequestType {
 
   private static final Map<Short, PipeRequestType> TYPE_MAP =
       Arrays.stream(PipeRequestType.values())
-          .collect(HashMap::new, (map, type) -> map.put(type.getType(), type), 
HashMap::putAll);
+          .collect(
+              HashMap::new,
+              (typeMap, pipeRequestType) -> 
typeMap.put(pipeRequestType.getType(), pipeRequestType),
+              HashMap::putAll);
 
   public static boolean isValidatedRequestType(short type) {
     return TYPE_MAP.containsKey(type);
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/connector/v2/handler/PipeTransferTabletInsertionEventHandler.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/connector/v2/handler/PipeTransferTabletInsertionEventHandler.java
index 6b150c53357..daf95b44add 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/connector/v2/handler/PipeTransferTabletInsertionEventHandler.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/connector/v2/handler/PipeTransferTabletInsertionEventHandler.java
@@ -42,7 +42,7 @@ public abstract class 
PipeTransferTabletInsertionEventHandler<E extends TPipeTra
     implements AsyncMethodCallback<E> {
 
   private static final Logger LOGGER =
-      
LoggerFactory.getLogger(PipeTransferInsertNodeTabletInsertionEventHandler.class);
+      LoggerFactory.getLogger(PipeTransferTabletInsertionEventHandler.class);
 
   private final long requestCommitId;
   private final EnrichedEvent event;
@@ -54,7 +54,7 @@ public abstract class 
PipeTransferTabletInsertionEventHandler<E extends TPipeTra
       (long) (PipeConfig.getInstance().getPipeConnectorRetryIntervalMs() * 
Math.pow(2, 5));
   private int retryCount = 0;
 
-  public PipeTransferTabletInsertionEventHandler(
+  protected PipeTransferTabletInsertionEventHandler(
       long requestCommitId,
       @Nullable EnrichedEvent event,
       TPipeTransferReq req,
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/connector/v2/handler/PipeTransferTsFileInsertionEventHandler.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/connector/v2/handler/PipeTransferTsFileInsertionEventHandler.java
index 21c244f0537..692fa0f7ada 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/connector/v2/handler/PipeTransferTsFileInsertionEventHandler.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/connector/v2/handler/PipeTransferTsFileInsertionEventHandler.java
@@ -151,7 +151,7 @@ public class PipeTransferTsFileInsertionEventHandler
       if (code == 
TSStatusCode.PIPE_TRANSFER_FILE_OFFSET_RESET.getStatusCode()) {
         position = resp.getEndWritingOffset();
         reader.seek(position);
-        LOGGER.info(String.format("Redirect file position to %s.", position));
+        LOGGER.info("Redirect file position to {}.", position);
       } else if (code != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
         throw new PipeException(
             String.format("Transfer file %s error, result status %s.", tsFile, 
resp.getStatus()));
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/event/EnrichedEvent.java 
b/server/src/main/java/org/apache/iotdb/db/pipe/event/EnrichedEvent.java
index f2df19bf2b8..462ba9164f6 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/event/EnrichedEvent.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/event/EnrichedEvent.java
@@ -40,7 +40,7 @@ public abstract class EnrichedEvent implements Event {
 
   private final String pattern;
 
-  public EnrichedEvent(PipeTaskMeta pipeTaskMeta, String pattern) {
+  protected EnrichedEvent(PipeTaskMeta pipeTaskMeta, String pattern) {
     referenceCount = new AtomicInteger(0);
     this.pipeTaskMeta = pipeTaskMeta;
     this.pattern = pattern;
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
index 0933e1a3c71..80d6eaf7879 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
@@ -124,7 +124,6 @@ public class PipeInsertNodeTabletInsertionEvent extends 
EnrichedEvent
       }
       return dataContainer.processRowByRow(consumer);
     } catch (Exception e) {
-      LOGGER.error("Process row by row error.", e);
       throw new PipeException("Process row by row error.", e);
     }
   }
@@ -137,7 +136,6 @@ public class PipeInsertNodeTabletInsertionEvent extends 
EnrichedEvent
       }
       return dataContainer.processTablet(consumer);
     } catch (Exception e) {
-      LOGGER.error("Process tablet error.", e);
       throw new PipeException("Process tablet error.", e);
     }
   }
@@ -155,7 +153,6 @@ public class PipeInsertNodeTabletInsertionEvent extends 
EnrichedEvent
       }
       return dataContainer.convertToTablet();
     } catch (Exception e) {
-      LOGGER.error("Convert to tablet error.", e);
       throw new PipeException("Convert to tablet error.", e);
     }
   }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
index 594c0a38291..168400443c8 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
@@ -75,16 +75,16 @@ public class PipeRawTabletInsertionEvent implements 
TabletInsertionEvent {
   }
 
   public Tablet convertToTablet() {
-    final String pattern = getPattern();
+    final String notNullPattern = getPattern();
 
-    // if pattern is "root", we don't need to convert, just return the 
original tablet
-    if (pattern.equals(PipeCollectorConstant.COLLECTOR_PATTERN_DEFAULT_VALUE)) 
{
+    // if notNullPattern is "root", we don't need to convert, just return the 
original tablet
+    if 
(notNullPattern.equals(PipeCollectorConstant.COLLECTOR_PATTERN_DEFAULT_VALUE)) {
       return tablet;
     }
 
-    // if pattern is not "root", we need to convert the tablet
+    // if notNullPattern is not "root", we need to convert the tablet
     if (dataContainer == null) {
-      dataContainer = new TabletInsertionDataContainer(tablet, isAligned, 
pattern);
+      dataContainer = new TabletInsertionDataContainer(tablet, isAligned, 
notNullPattern);
     }
     return dataContainer.convertToTablet();
   }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/TabletInsertionDataContainer.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/TabletInsertionDataContainer.java
index 86fbbd28f6e..ede8ce91303 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/TabletInsertionDataContainer.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/TabletInsertionDataContainer.java
@@ -19,7 +19,6 @@
 
 package org.apache.iotdb.db.pipe.event.common.tablet;
 
-import org.apache.iotdb.commons.exception.IllegalPathException;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertRowNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertTabletNode;
@@ -28,7 +27,6 @@ import 
org.apache.iotdb.db.pipe.event.common.row.PipeRowCollector;
 import org.apache.iotdb.pipe.api.access.Row;
 import org.apache.iotdb.pipe.api.collector.RowCollector;
 import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
-import org.apache.iotdb.pipe.api.exception.PipeException;
 import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
 import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
@@ -62,18 +60,13 @@ public class TabletInsertionDataContainer {
   private Tablet tablet;
 
   public TabletInsertionDataContainer(InsertNode insertNode, String pattern) {
-    try {
-      if (insertNode instanceof InsertRowNode) {
-        parse((InsertRowNode) insertNode, pattern);
-      } else if (insertNode instanceof InsertTabletNode) {
-        parse((InsertTabletNode) insertNode, pattern);
-      } else {
-        throw new UnSupportedDataTypeException(
-            String.format("InsertNode type %s is not supported.", 
insertNode.getClass().getName()));
-      }
-    } catch (IllegalPathException e) {
-      throw new PipeException(
-          String.format("Failed to parse insertNode with pattern %s.", 
pattern), e);
+    if (insertNode instanceof InsertRowNode) {
+      parse((InsertRowNode) insertNode, pattern);
+    } else if (insertNode instanceof InsertTabletNode) {
+      parse((InsertTabletNode) insertNode, pattern);
+    } else {
+      throw new UnSupportedDataTypeException(
+          String.format("InsertNode type %s is not supported.", 
insertNode.getClass().getName()));
     }
   }
 
@@ -83,7 +76,7 @@ public class TabletInsertionDataContainer {
 
   //////////////////////////// parse ////////////////////////////
 
-  private void parse(InsertRowNode insertRowNode, String pattern) throws 
IllegalPathException {
+  private void parse(InsertRowNode insertRowNode, String pattern) {
     final int originColumnSize = insertRowNode.getMeasurements().length;
     final Integer[] originColumnIndex2FilteredColumnIndexMapperList = new 
Integer[originColumnSize];
 
@@ -149,8 +142,7 @@ public class TabletInsertionDataContainer {
     rowCount = 1;
   }
 
-  private void parse(InsertTabletNode insertTabletNode, String pattern)
-      throws IllegalPathException {
+  private void parse(InsertTabletNode insertTabletNode, String pattern) {
     final int originColumnSize = insertTabletNode.getMeasurements().length;
     final Integer[] originColumnIndex2FilteredColumnIndexMapperList = new 
Integer[originColumnSize];
 
@@ -269,7 +261,6 @@ public class TabletInsertionDataContainer {
     rowCount = tablet.rowSize;
   }
 
-  // TODO: cache the result keyed by deviceId to improve performance
   private void generateColumnIndexMapper(
       String[] originMeasurementList,
       String pattern,
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
index 40aaf53c1ec..7e06fb0e01f 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
@@ -170,12 +170,12 @@ public class PipeTsFileInsertionEvent extends 
EnrichedEvent implements TsFileIns
       final String errorMsg =
           String.format(
               "Interrupted when waiting for closing TsFile %s.", 
resource.getTsFilePath());
-      LOGGER.warn(errorMsg);
+      LOGGER.warn(errorMsg, e);
       Thread.currentThread().interrupt();
       throw new PipeException(errorMsg);
     } catch (IOException e) {
       final String errorMsg = String.format("Read TsFile %s error.", 
resource.getTsFilePath());
-      LOGGER.warn(errorMsg);
+      LOGGER.warn(errorMsg, e);
       throw new PipeException(errorMsg);
     }
   }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/TsFileInsertionDataContainer.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/TsFileInsertionDataContainer.java
index a9a5f657d83..0d72fd06e53 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/TsFileInsertionDataContainer.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/TsFileInsertionDataContainer.java
@@ -79,7 +79,6 @@ public class TsFileInsertionDataContainer implements 
AutoCloseable {
       deviceIsAlignedMap = readDeviceIsAlignedMap();
       measurementDataTypeMap = tsFileSequenceReader.getFullPathDataTypeMap();
     } catch (Exception e) {
-      LOGGER.error("failed to create TsFileInsertionDataContainer", e);
       close();
       throw e;
     }
@@ -125,14 +124,14 @@ public class TsFileInsertionDataContainer implements 
AutoCloseable {
   }
 
   private Map<String, Boolean> readDeviceIsAlignedMap() throws IOException {
-    final Map<String, Boolean> deviceIsAlignedMap = new HashMap<>();
+    final Map<String, Boolean> deviceIsAlignedResultMap = new HashMap<>();
     final TsFileDeviceIterator deviceIsAlignedIterator =
         tsFileSequenceReader.getAllDevicesIteratorWithIsAligned();
     while (deviceIsAlignedIterator.hasNext()) {
       final Pair<String, Boolean> deviceIsAlignedPair = 
deviceIsAlignedIterator.next();
-      deviceIsAlignedMap.put(deviceIsAlignedPair.getLeft(), 
deviceIsAlignedPair.getRight());
+      deviceIsAlignedResultMap.put(deviceIsAlignedPair.getLeft(), 
deviceIsAlignedPair.getRight());
     }
-    return deviceIsAlignedMap;
+    return deviceIsAlignedResultMap;
   }
 
   /** @return TabletInsertionEvent in a streaming way */
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/execution/executor/PipeSubtaskExecutorManager.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/execution/executor/PipeSubtaskExecutorManager.java
index 22905643a02..ff42491bd35 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/execution/executor/PipeSubtaskExecutorManager.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/execution/executor/PipeSubtaskExecutorManager.java
@@ -19,17 +19,13 @@
 
 package org.apache.iotdb.db.pipe.execution.executor;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 /**
  * PipeTaskExecutor is responsible for executing the pipe tasks, and it is 
scheduled by the
  * PipeTaskScheduler. It is a singleton class.
  */
+@SuppressWarnings("unused") // assignerSubtaskExecutor is for future use
 public class PipeSubtaskExecutorManager {
 
-  private static final Logger LOGGER = 
LoggerFactory.getLogger(PipeSubtaskExecutorManager.class);
-
   private final PipeAssignerSubtaskExecutor assignerSubtaskExecutor;
   private final PipeProcessorSubtaskExecutor processorSubtaskExecutor;
   private final PipeConnectorSubtaskExecutor connectorSubtaskExecutor;
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/resource/file/PipeFileResourceManager.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/resource/file/PipeFileResourceManager.java
index ca81f1c7643..8d12a802978 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/resource/file/PipeFileResourceManager.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/resource/file/PipeFileResourceManager.java
@@ -79,12 +79,8 @@ public class PipeFileResourceManager {
   }
 
   private boolean increaseReferenceIfExists(String path) {
-    if (hardlinkOrCopiedFileToReferenceMap.containsKey(path)) {
-      hardlinkOrCopiedFileToReferenceMap.put(
-          path, hardlinkOrCopiedFileToReferenceMap.get(path) + 1);
-      return true;
-    }
-    return false;
+    hardlinkOrCopiedFileToReferenceMap.computeIfPresent(path, (key, value) -> 
value + 1);
+    return hardlinkOrCopiedFileToReferenceMap.containsKey(path);
   }
 
   private static File getHardlinkOrCopiedFileInPipeDir(File file) throws 
IOException {
@@ -125,8 +121,11 @@ public class PipeFileResourceManager {
   }
 
   private static File createHardLink(File sourceFile, File hardlink) throws 
IOException {
-    if (!hardlink.getParentFile().exists()) {
-      boolean ignored = hardlink.getParentFile().mkdirs();
+    if (!hardlink.getParentFile().exists() && 
!hardlink.getParentFile().mkdirs()) {
+      throw new IOException(
+          String.format(
+              "failed to create hardlink %s for file %s: failed to create 
parent dir %s",
+              hardlink.getPath(), sourceFile.getPath(), 
hardlink.getParentFile().getPath()));
     }
 
     final Path sourcePath = 
FileSystems.getDefault().getPath(sourceFile.getAbsolutePath());
@@ -136,8 +135,11 @@ public class PipeFileResourceManager {
   }
 
   private static File copyFile(File sourceFile, File targetFile) throws 
IOException {
-    if (!targetFile.getParentFile().exists()) {
-      boolean ignored = targetFile.getParentFile().mkdirs();
+    if (!targetFile.getParentFile().exists() && 
!targetFile.getParentFile().mkdirs()) {
+      throw new IOException(
+          String.format(
+              "failed to copy file %s to %s: failed to create parent dir %s",
+              sourceFile.getPath(), targetFile.getPath(), 
targetFile.getParentFile().getPath()));
     }
 
     Files.copy(sourceFile.toPath(), targetFile.toPath());
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/resource/wal/PipeWALResource.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/resource/wal/PipeWALResource.java
index b4a01a7828b..b882a96962d 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/resource/wal/PipeWALResource.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/resource/wal/PipeWALResource.java
@@ -27,11 +27,12 @@ import org.apache.iotdb.db.wal.utils.WALEntryHandler;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.Closeable;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 
-public class PipeWALResource implements AutoCloseable {
+public class PipeWALResource implements Closeable {
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(PipeWALResource.class);
 
@@ -39,8 +40,7 @@ public class PipeWALResource implements AutoCloseable {
 
   private final AtomicInteger referenceCount;
 
-  // TODO: make this configurable
-  public static final long MIN_TIME_TO_LIVE_IN_MS = 1000 * 60;
+  public static final long MIN_TIME_TO_LIVE_IN_MS = 1000L * 60;
   private final AtomicLong lastLogicalPinTime;
   private final AtomicBoolean isPhysicallyPinned;
 
@@ -155,8 +155,4 @@ public class PipeWALResource implements AutoCloseable {
 
     referenceCount.set(0);
   }
-
-  public int getReferenceCount() {
-    return referenceCount.get();
-  }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/resource/wal/PipeWALResourceManager.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/resource/wal/PipeWALResourceManager.java
index de67bb8543f..6fdf3a84ed0 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/resource/wal/PipeWALResourceManager.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/resource/wal/PipeWALResourceManager.java
@@ -9,11 +9,10 @@ import java.util.Iterator;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.ReentrantLock;
 
-public class PipeWALResourceManager implements AutoCloseable {
+public class PipeWALResourceManager {
 
   private final Map<Long, PipeWALResource> memtableIdToPipeWALResourceMap;
 
@@ -23,7 +22,6 @@ public class PipeWALResourceManager implements AutoCloseable {
   private static final ScheduledExecutorService PIPE_WAL_RESOURCE_TTL_CHECKER =
       IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor(
           ThreadName.PIPE_WAL_RESOURCE_TTL_CHECKER.getName());
-  private final ScheduledFuture<?> ttlCheckerFuture;
 
   public PipeWALResourceManager() {
     // memtableIdToPipeWALResourceMap can be concurrently accessed by multiple 
threads
@@ -34,30 +32,29 @@ public class PipeWALResourceManager implements 
AutoCloseable {
       memtableIdSegmentLocks[i] = new ReentrantLock();
     }
 
-    ttlCheckerFuture =
-        ScheduledExecutorUtil.safelyScheduleWithFixedDelay(
-            PIPE_WAL_RESOURCE_TTL_CHECKER,
-            () -> {
-              Iterator<Map.Entry<Long, PipeWALResource>> iterator =
-                  memtableIdToPipeWALResourceMap.entrySet().iterator();
-              while (iterator.hasNext()) {
-                final Map.Entry<Long, PipeWALResource> entry = iterator.next();
-                final ReentrantLock lock =
-                    memtableIdSegmentLocks[(int) (entry.getKey() % 
SEGMENT_LOCK_COUNT)];
-
-                lock.lock();
-                try {
-                  if (entry.getValue().invalidateIfPossible()) {
-                    iterator.remove();
-                  }
-                } finally {
-                  lock.unlock();
-                }
+    ScheduledExecutorUtil.safelyScheduleWithFixedDelay(
+        PIPE_WAL_RESOURCE_TTL_CHECKER,
+        () -> {
+          Iterator<Map.Entry<Long, PipeWALResource>> iterator =
+              memtableIdToPipeWALResourceMap.entrySet().iterator();
+          while (iterator.hasNext()) {
+            final Map.Entry<Long, PipeWALResource> entry = iterator.next();
+            final ReentrantLock lock =
+                memtableIdSegmentLocks[(int) (entry.getKey() % 
SEGMENT_LOCK_COUNT)];
+
+            lock.lock();
+            try {
+              if (entry.getValue().invalidateIfPossible()) {
+                iterator.remove();
               }
-            },
-            PipeWALResource.MIN_TIME_TO_LIVE_IN_MS,
-            PipeWALResource.MIN_TIME_TO_LIVE_IN_MS,
-            TimeUnit.MILLISECONDS);
+            } finally {
+              lock.unlock();
+            }
+          }
+        },
+        PipeWALResource.MIN_TIME_TO_LIVE_IN_MS,
+        PipeWALResource.MIN_TIME_TO_LIVE_IN_MS,
+        TimeUnit.MILLISECONDS);
   }
 
   public void pin(long memtableId, WALEntryHandler walEntryHandler) {
@@ -83,34 +80,4 @@ public class PipeWALResourceManager implements AutoCloseable 
{
       lock.unlock();
     }
   }
-
-  @Override
-  public void close() throws Exception {
-    if (ttlCheckerFuture != null) {
-      ttlCheckerFuture.cancel(true);
-    }
-
-    for (final long memtableId : memtableIdToPipeWALResourceMap.keySet()) {
-      final ReentrantLock lock = memtableIdSegmentLocks[(int) (memtableId % 
SEGMENT_LOCK_COUNT)];
-
-      lock.lock();
-      try {
-        memtableIdToPipeWALResourceMap.get(memtableId).close();
-        memtableIdToPipeWALResourceMap.remove(memtableId);
-      } finally {
-        lock.unlock();
-      }
-    }
-  }
-
-  public int getReferenceCount(long memtableId) {
-    final ReentrantLock lock = memtableIdSegmentLocks[(int) (memtableId % 
SEGMENT_LOCK_COUNT)];
-
-    lock.lock();
-    try {
-      return 
memtableIdToPipeWALResourceMap.get(memtableId).getReferenceCount();
-    } finally {
-      lock.unlock();
-    }
-  }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/task/connection/EventSupplier.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/task/connection/EventSupplier.java
index 0efb7a2ae67..c00d294be00 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/task/connection/EventSupplier.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/task/connection/EventSupplier.java
@@ -29,5 +29,6 @@ public interface EventSupplier {
    *     the moment, but the collector is still running for more events.
    * @throws Exception if the supplier fails to supply the event.
    */
+  @SuppressWarnings("squid:S00112") // Exception is thrown by the interface
   Event supply() throws Exception;
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskCollectorStage.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskCollectorStage.java
index 6c6f2498b97..0d676ac4992 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskCollectorStage.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskCollectorStage.java
@@ -43,8 +43,7 @@ public class PipeTaskCollectorStage extends PipeTaskStage {
       PipeParameters collectorParameters,
       TConsensusGroupId dataRegionId,
       PipeTaskMeta pipeTaskMeta) {
-    // TODO: avoid if-else, use reflection to create collector all the time
-    this.pipeCollector =
+    pipeCollector =
         collectorParameters
                 .getStringOrDefault(
                     PipeCollectorConstant.COLLECTOR_KEY,
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskConnectorStage.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskConnectorStage.java
index ff4e954e614..ba8058524ae 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskConnectorStage.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskConnectorStage.java
@@ -46,7 +46,9 @@ public class PipeTaskConnectorStage extends PipeTaskStage {
   }
 
   @Override
-  public void createSubtask() throws PipeException {}
+  public void createSubtask() throws PipeException {
+    // do nothing
+  }
 
   @Override
   public void startSubtask() throws PipeException {
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskStage.java 
b/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskStage.java
index e3a793c0d8a..c2b4426435d 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskStage.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskStage.java
@@ -24,6 +24,15 @@ import org.apache.iotdb.pipe.api.exception.PipeException;
 
 public abstract class PipeTaskStage {
 
+  private static final String MESSAGE_PIPE_TASK_STAGE_HAS_BEEN_STARTED =
+      "The PipeTaskStage has been started";
+  private static final String MESSAGE_PIPE_TASK_STAGE_HAS_BEEN_DROPPED =
+      "The PipeTaskStage has been dropped";
+  private static final String MESSAGE_PIPE_TASK_STAGE_HAS_BEEN_STOPPED =
+      "The PipeTaskStage has been externally stopped";
+  private static final String MESSAGE_PIPE_TASK_STAGE_HAS_NOT_BEEN_CREATED =
+      "The PipeTaskStage has not been created";
+
   protected PipeStatus status = null;
   protected boolean hasBeenExternallyStopped = false;
 
@@ -35,14 +44,14 @@ public abstract class PipeTaskStage {
   public synchronized void create() {
     if (status != null) {
       if (status == PipeStatus.RUNNING) {
-        throw new PipeException("The PipeTaskStage has been started");
+        throw new PipeException(MESSAGE_PIPE_TASK_STAGE_HAS_BEEN_STARTED);
       }
       if (status == PipeStatus.DROPPED) {
-        throw new PipeException("The PipeTaskStage has been dropped");
+        throw new PipeException(MESSAGE_PIPE_TASK_STAGE_HAS_BEEN_DROPPED);
       }
       // status == PipeStatus.STOPPED
       if (hasBeenExternallyStopped) {
-        throw new PipeException("The PipeTaskStage has been externally 
stopped");
+        throw new PipeException(MESSAGE_PIPE_TASK_STAGE_HAS_BEEN_STOPPED);
       }
       // otherwise, do nothing to allow retry strategy
       return;
@@ -63,14 +72,14 @@ public abstract class PipeTaskStage {
    */
   public synchronized void start() {
     if (status == null) {
-      throw new PipeException("The PipeTaskStage has not been created");
+      throw new PipeException(MESSAGE_PIPE_TASK_STAGE_HAS_NOT_BEEN_CREATED);
     }
     if (status == PipeStatus.RUNNING) {
       // do nothing to allow retry strategy
       return;
     }
     if (status == PipeStatus.DROPPED) {
-      throw new PipeException("The PipeTaskStage has been dropped");
+      throw new PipeException(MESSAGE_PIPE_TASK_STAGE_HAS_BEEN_DROPPED);
     }
 
     // status == PipeStatus.STOPPED, start the subtask
@@ -88,14 +97,14 @@ public abstract class PipeTaskStage {
    */
   public synchronized void stop() {
     if (status == null) {
-      throw new PipeException("The PipeTaskStage has not been created");
+      throw new PipeException(MESSAGE_PIPE_TASK_STAGE_HAS_NOT_BEEN_CREATED);
     }
     if (status == PipeStatus.STOPPED) {
       // do nothing to allow retry strategy
       return;
     }
     if (status == PipeStatus.DROPPED) {
-      throw new PipeException("The PipeTaskStage has been dropped");
+      throw new PipeException(MESSAGE_PIPE_TASK_STAGE_HAS_BEEN_DROPPED);
     }
 
     // status == PipeStatus.RUNNING, stop the connector
@@ -114,7 +123,7 @@ public abstract class PipeTaskStage {
    */
   public synchronized void drop() {
     if (status == null) {
-      throw new PipeException("The PipeTaskStage has not been created");
+      throw new PipeException(MESSAGE_PIPE_TASK_STAGE_HAS_NOT_BEEN_CREATED);
     }
     if (status == PipeStatus.DROPPED) {
       // do nothing to allow retry strategy
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeConnectorSubtask.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeConnectorSubtask.java
index ce2b6403a2f..47dd1ba7e03 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeConnectorSubtask.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeConnectorSubtask.java
@@ -86,7 +86,6 @@ public class PipeConnectorSubtask extends PipeSubtask {
     } catch (PipeConnectionException e) {
       throw e;
     } catch (Exception e) {
-      LOGGER.warn("Execute Connector subtask once error.", e);
       throw new PipeException(
           "Error occurred during executing PipeConnector#transfer, perhaps 
need to check whether the implementation of PipeConnector is correct according 
to the pipe-api description.",
           e);
@@ -99,19 +98,29 @@ public class PipeConnectorSubtask extends PipeSubtask {
   public void onFailure(@NotNull Throwable throwable) {
     // retry to connect to the target system if the connection is broken
     if (throwable instanceof PipeConnectionException) {
+      LOGGER.warn(
+          "PipeConnectionException occurred, retrying to connect to the target 
system...",
+          throwable);
+
       int retry = 0;
       while (retry < MAX_RETRY_TIMES) {
         try {
           outputPipeConnector.handshake();
+          LOGGER.info("Successfully reconnected to the target system.");
           break;
         } catch (Exception e) {
           retry++;
-          LOGGER.error("Failed to reconnect to the target system, retrying... 
({} time(s))", retry);
+          LOGGER.warn(
+              "Failed to reconnect to the target system, retrying ... after 
[{}/{}] time(s) retries.",
+              retry,
+              MAX_RETRY_TIMES,
+              e);
           try {
             Thread.sleep(retry * 
PipeConfig.getInstance().getPipeConnectorRetryIntervalMs());
           } catch (InterruptedException interruptedException) {
             LOGGER.info(
-                "Interrupted while sleeping, perhaps need to check whether the 
thread is interrupted.");
+                "Interrupted while sleeping, perhaps need to check whether the 
thread is interrupted.",
+                interruptedException);
             Thread.currentThread().interrupt();
           }
         }
@@ -120,22 +129,35 @@ public class PipeConnectorSubtask extends PipeSubtask {
       // stop current pipe task if failed to reconnect to the target system 
after MAX_RETRY_TIMES
       // times
       if (retry == MAX_RETRY_TIMES) {
-        final String errorMessage =
-            String.format(
-                "Failed to reconnect to the target system after %d times, 
stopping current pipe task %s...",
-                MAX_RETRY_TIMES, taskID);
-        LOGGER.warn(errorMessage, throwable);
-        lastFailedCause = throwable;
-
         if (lastEvent instanceof EnrichedEvent) {
+          LOGGER.warn(
+              "Failed to reconnect to the target system after {} times, 
stopping current pipe task {}... "
+                  + "Status shown when query the pipe will be 'STOPPED'. "
+                  + "Please restart the task by executing 'START PIPE' 
manually if needed.",
+              MAX_RETRY_TIMES,
+              taskID,
+              throwable);
+
           ((EnrichedEvent) lastEvent)
               .reportException(new 
PipeRuntimeConnectorCriticalException(throwable.getMessage()));
+        } else {
+          LOGGER.error(
+              "Failed to reconnect to the target system after {} times, 
stopping current pipe task {} locally... "
+                  + "Status shown when query the pipe will be 'RUNNING' 
instead of 'STOPPED', but the task is actually stopped. "
+                  + "Please restart the task by executing 'START PIPE' 
manually if needed.",
+              MAX_RETRY_TIMES,
+              taskID,
+              throwable);
+
+          // FIXME: non-EnrichedEvent should be reported to the ConfigNode 
instead of being logged
         }
 
         // although the pipe task will be stopped, we still don't release the 
last event here
         // because we need to keep it for the next retry. if user wants to 
restart the task,
         // the last event will be processed again. the last event will be 
released when the task
         // is dropped or the process is running normally.
+
+        // stop current pipe task if failed to reconnect to the target system 
after MAX_RETRY_TIMES
         return;
       }
     }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeConnectorSubtaskManager.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeConnectorSubtaskManager.java
index 071ab4da15e..9c56b4e04db 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeConnectorSubtaskManager.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeConnectorSubtaskManager.java
@@ -42,6 +42,9 @@ import java.util.TreeMap;
 
 public class PipeConnectorSubtaskManager {
 
+  private static final String FAILED_TO_DEREGISTER_EXCEPTION_MESSAGE =
+      "Failed to deregister PipeConnectorSubtask. No such subtask: ";
+
   private final Map<String, PipeConnectorSubtaskLifeCycle>
       attributeSortedString2SubtaskLifeCycleMap = new HashMap<>();
 
@@ -53,7 +56,6 @@ public class PipeConnectorSubtaskManager {
         new TreeMap<>(pipeConnectorParameters.getAttribute()).toString();
 
     if 
(!attributeSortedString2SubtaskLifeCycleMap.containsKey(attributeSortedString)) 
{
-      // TODO: construct all PipeConnector with the same reflection method, 
avoid using if-else
       // 1. construct, validate and customize PipeConnector, and then 
handshake (create connection)
       // with the target
       final String connectorKey =
@@ -103,8 +105,7 @@ public class PipeConnectorSubtaskManager {
 
   public synchronized void deregister(String attributeSortedString) {
     if 
(!attributeSortedString2SubtaskLifeCycleMap.containsKey(attributeSortedString)) 
{
-      throw new PipeException(
-          "Failed to deregister PipeConnectorSubtask. No such subtask: " + 
attributeSortedString);
+      throw new PipeException(FAILED_TO_DEREGISTER_EXCEPTION_MESSAGE + 
attributeSortedString);
     }
 
     if 
(attributeSortedString2SubtaskLifeCycleMap.get(attributeSortedString).deregister())
 {
@@ -114,8 +115,7 @@ public class PipeConnectorSubtaskManager {
 
   public synchronized void start(String attributeSortedString) {
     if 
(!attributeSortedString2SubtaskLifeCycleMap.containsKey(attributeSortedString)) 
{
-      throw new PipeException(
-          "Failed to deregister PipeConnectorSubtask. No such subtask: " + 
attributeSortedString);
+      throw new PipeException(FAILED_TO_DEREGISTER_EXCEPTION_MESSAGE + 
attributeSortedString);
     }
 
     
attributeSortedString2SubtaskLifeCycleMap.get(attributeSortedString).start();
@@ -123,8 +123,7 @@ public class PipeConnectorSubtaskManager {
 
   public synchronized void stop(String attributeSortedString) {
     if 
(!attributeSortedString2SubtaskLifeCycleMap.containsKey(attributeSortedString)) 
{
-      throw new PipeException(
-          "Failed to deregister PipeConnectorSubtask. No such subtask: " + 
attributeSortedString);
+      throw new PipeException(FAILED_TO_DEREGISTER_EXCEPTION_MESSAGE + 
attributeSortedString);
     }
 
     
attributeSortedString2SubtaskLifeCycleMap.get(attributeSortedString).stop();
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeProcessorSubtask.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeProcessorSubtask.java
index 2472a29eefe..cd7bc206454 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeProcessorSubtask.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeProcessorSubtask.java
@@ -69,7 +69,6 @@ public class PipeProcessorSubtask extends PipeSubtask {
 
       releaseLastEvent();
     } catch (Exception e) {
-      e.printStackTrace();
       throw new PipeException(
           "Error occurred during executing PipeProcessor#process, perhaps need 
to check whether the implementation of PipeProcessor is correct according to 
the pipe-api description.",
           e);
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeSubtask.java 
b/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeSubtask.java
index 9f5c6e4a37a..4672f8cc532 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeSubtask.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeSubtask.java
@@ -53,7 +53,6 @@ public abstract class PipeSubtask implements 
FutureCallback<Void>, Callable<Void
 
   protected static final int MAX_RETRY_TIMES = 5;
   private final AtomicInteger retryCount = new AtomicInteger(0);
-  protected Throwable lastFailedCause;
 
   protected Event lastEvent;
 
@@ -97,6 +96,7 @@ public abstract class PipeSubtask implements 
FutureCallback<Void>, Callable<Void
    * @return true if the event is consumed successfully, false if no more 
event can be consumed
    * @throws Exception if any error occurs when consuming the event
    */
+  @SuppressWarnings("squid:S112") // allow to throw Exception
   protected abstract boolean executeOnce() throws Exception;
 
   @Override
@@ -107,20 +107,31 @@ public abstract class PipeSubtask implements 
FutureCallback<Void>, Callable<Void
 
   @Override
   public void onFailure(@NotNull Throwable throwable) {
+    if (retryCount.get() == 0) {
+      LOGGER.warn(
+          "Failed to execute subtask {}({}), because of {}. Will retry for {} 
times.",
+          taskID,
+          this.getClass().getSimpleName(),
+          throwable.getMessage(),
+          MAX_RETRY_TIMES,
+          throwable);
+    }
+
     if (retryCount.get() < MAX_RETRY_TIMES) {
       retryCount.incrementAndGet();
       LOGGER.warn(
-          String.format(
-              "Retry subtask %s, retry count [%s/%s]",
-              this.getClass().getSimpleName(), retryCount.get(), 
MAX_RETRY_TIMES));
+          "Retry executing subtask {}({}), retry count [{}/{}]",
+          taskID,
+          this.getClass().getSimpleName(),
+          retryCount.get(),
+          MAX_RETRY_TIMES);
       submitSelf();
     } else {
       final String errorMessage =
           String.format(
-              "Subtask %s failed, has been retried for %d times, last failed 
because of %s",
-              taskID, retryCount.get(), throwable);
+              "Failed to execute subtask %s(%s), retry count exceeds the max 
retry times %d, last exception: %s",
+              taskID, this.getClass().getSimpleName(), retryCount.get(), 
throwable.getMessage());
       LOGGER.warn(errorMessage, throwable);
-      lastFailedCause = throwable;
 
       if (lastEvent instanceof EnrichedEvent) {
         ((EnrichedEvent) lastEvent)
@@ -128,6 +139,23 @@ public abstract class PipeSubtask implements 
FutureCallback<Void>, Callable<Void
                 throwable instanceof PipeRuntimeException
                     ? (PipeRuntimeException) throwable
                     : new PipeRuntimeCriticalException(errorMessage));
+        LOGGER.warn(
+            "The last event is an instance of EnrichedEvent, so the exception 
is reported. "
+                + "Stopping current pipe task {}({}) locally... "
+                + "Status shown when query the pipe will be 'STOPPED'. "
+                + "Please restart the task by executing 'START PIPE' manually 
if needed.",
+            taskID,
+            this.getClass().getSimpleName(),
+            throwable);
+      } else {
+        LOGGER.error(
+            "The last event is not an instance of EnrichedEvent, so the 
exception cannot be reported. "
+                + "Stopping current pipe task {}({}) locally... "
+                + "Status shown when query the pipe will be 'RUNNING' instead 
of 'STOPPED', but the task is actually stopped. "
+                + "Please restart the task by executing 'START PIPE' manually 
if needed.",
+            taskID,
+            this.getClass().getSimpleName(),
+            throwable);
       }
 
       // although the pipe task will be stopped, we still don't release the 
last event here
@@ -185,8 +213,4 @@ public abstract class PipeSubtask implements 
FutureCallback<Void>, Callable<Void
   public String getTaskID() {
     return taskID;
   }
-
-  public Throwable getLastFailedCause() {
-    return lastFailedCause;
-  }
 }


Reply via email to