This is an automated email from the ASF dual-hosted git repository.

jt2594838 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new b616502aec0 Pipe Log: Added the remaining chinese logs & Further 
reduced the repeatable logs (#17700)
b616502aec0 is described below

commit b616502aec0b700d0b7f3a1577e9ecee1edc365b
Author: Caideyipi <[email protected]>
AuthorDate: Fri May 29 14:12:58 2026 +0800

    Pipe Log: Added the remaining chinese logs & Further reduced the repeatable 
logs (#17700)
    
    * zh
    
    * h
---
 .../iotdb/confignode/i18n/ManagerMessages.java     |  14 +--
 .../apache/iotdb/db/i18n/DataNodePipeMessages.java |  19 ++++
 .../apache/iotdb/db/i18n/DataNodePipeMessages.java |  73 +++++++++------
 .../pipe/agent/plugin/PipeDataNodePluginAgent.java |  22 +++--
 .../common/tsfile/PipeTsFileInsertionEvent.java    |  10 +-
 .../client/IoTDBDataNodeAsyncClientManager.java    |  28 ++++--
 .../iotconsensusv2/IoTConsensusV2AsyncSink.java    | 103 +++++++++++++++------
 .../handler/IoTConsensusV2DeleteEventHandler.java  |  16 +++-
 .../IoTConsensusV2TabletBatchEventHandler.java     |  20 +++-
 .../IoTConsensusV2TabletInsertionEventHandler.java |  17 +++-
 .../IoTConsensusV2TsFileInsertionEventHandler.java |  18 +++-
 .../sink/protocol/legacy/IoTDBLegacyPipeSink.java  |   3 +-
 .../thrift/async/IoTDBDataRegionAsyncSink.java     |  33 +++++--
 .../handler/PipeTransferTrackableHandler.java      |  21 +++--
 .../async/handler/PipeTransferTsFileHandler.java   |  23 ++++-
 .../PipeRealtimeDataRegionHybridSource.java        |   4 +-
 .../realtime/PipeRealtimeDataRegionLogSource.java  |   5 +-
 .../realtime/PipeRealtimeDataRegionSource.java     |   8 +-
 .../PipeRealtimeDataRegionTsFileSource.java        |   5 +-
 .../apache/iotdb/commons/i18n/PipeMessages.java    |  14 +++
 .../apache/iotdb/commons/i18n/PipeMessages.java    |  14 +++
 .../pipe/datastructure/pattern/TreePattern.java    |  36 +++----
 22 files changed, 351 insertions(+), 155 deletions(-)

diff --git 
a/iotdb-core/confignode/src/main/i18n/zh/org/apache/iotdb/confignode/i18n/ManagerMessages.java
 
b/iotdb-core/confignode/src/main/i18n/zh/org/apache/iotdb/confignode/i18n/ManagerMessages.java
index 836fe7dd60b..e7321e5376c 100644
--- 
a/iotdb-core/confignode/src/main/i18n/zh/org/apache/iotdb/confignode/i18n/ManagerMessages.java
+++ 
b/iotdb-core/confignode/src/main/i18n/zh/org/apache/iotdb/confignode/i18n/ManagerMessages.java
@@ -28,11 +28,11 @@ public final class ManagerMessages {
   public static final String 
AFTER_THIS_SUCCESSFUL_SYNC_IF_SUBSCRIPTIONINFO_IS_EMPTY_DURING_THIS =
       "After this successful sync, if SubscriptionInfo is empty during this 
sync and has not been modified afterwards, all subsequent syncs will be 
skipped";
   public static final String 
ATTEMPT_TO_REPORT_PIPE_EXCEPTION_TO_A_NULL_PIPETASKMETA =
-      "Attempt to report pipe exception to a null PipeTaskMeta.";
+      "尝试向空的 PipeTaskMeta 上报 pipe 异常。";
   public static final String AUTH_RUN_AUTH_PLAN = "Auth: run auth plan: {}";
   public static final String CLUSTERID = "clusterID: {}";
   public static final String COLLECTING_PIPE_HEARTBEAT_FROM_DATA_NODES =
-      "Collecting pipe heartbeat {} from data nodes";
+      "正在从 data nodes 收集 pipe 心跳 {}";
   public static final String CONNECTION_FROM_DATANODE_TO_DATANODE_IS_BROKEN =
       "Connection from DataNode {} to DataNode {} is broken";
   public static final String CONSENSUSGROUPSTATISTICS = 
"[ConsensusGroupStatistics]\t {}: {} -> {}";
@@ -128,7 +128,7 @@ public final class ManagerMessages {
   public static final String FAILED_TO_CREATE_PEER_FOR_CONSENSUS_GROUP =
       "Failed to create peer for consensus group";
   public static final String FAILED_TO_CREATE_PIPE_RESULT_STATUS =
-      "Failed to create pipe {}. Result status: {}.";
+      "创建 pipe {} 失败。结果状态:{}。";
   public static final String FAILED_TO_CREATE_SUBTASK_FOR_PIPE_CREATION_TIME =
       "Failed to create subtask for pipe %s, creation time %d";
   public static final String 
FAILED_TO_CREATE_TOPIC_WITH_ATTRIBUTES_RESULT_STATUS =
@@ -143,7 +143,7 @@ public final class ManagerMessages {
   public static final String 
FAILED_TO_DEREGISTER_PIPE_TEMPORARY_META_METRICS_PIPETEMPORARYMETA_DOES_NOT =
       "Failed to deregister pipe temporary meta metrics, PipeTemporaryMeta({}) 
does not exist";
   public static final String FAILED_TO_DROP_PIPE_RESULT_STATUS =
-      "Failed to drop pipe {}. Result status: {}.";
+      "删除 pipe {} 失败。结果状态:{}。";
   public static final String FAILED_TO_GET_ALL_PIPE_INFO = "Failed to get all 
pipe info.";
   public static final String FAILED_TO_GET_ALL_SUBSCRIPTION_INFO =
       "Failed to get all subscription info.";
@@ -162,9 +162,9 @@ public final class ManagerMessages {
   public static final String FAILED_TO_SHOW_SUBSCRIPTION_INFO = "Failed to 
show subscription info.";
   public static final String FAILED_TO_SHOW_TOPIC_INFO = "Failed to show topic 
info.";
   public static final String FAILED_TO_START_PIPE_RESULT_STATUS =
-      "Failed to start pipe {}. Result status: {}.";
+      "启动 pipe {} 失败。结果状态:{}。";
   public static final String FAILED_TO_STOP_PIPE_RESULT_STATUS =
-      "Failed to stop pipe {}. Result status: {}.";
+      "停止 pipe {} 失败。结果状态:{}。";
   public static final String 
FAILED_TO_SUBMIT_ASYNC_CONSENSUS_PIPE_CREATION_FOR =
       "Failed to submit async consensus pipe creation for {}: {}";
   public static final String FAILED_TO_SUBMIT_ASYNC_CONSENSUS_PIPE_DROP_FOR =
@@ -172,7 +172,7 @@ public final class ManagerMessages {
   public static final String FAILED_TO_SYNC_CONSUMER_GROUP_META_RESULT_STATUS =
       "Failed to sync consumer group meta. Result status: {}.";
   public static final String FAILED_TO_SYNC_PIPE_META_RESULT_STATUS =
-      "Failed to sync pipe meta. Result status: {}.";
+      "同步 pipe 元数据失败。结果状态:{}。";
   public static final String 
FAILED_TO_SYNC_TEMPLATE_EXTENSION_INFO_TO_DATANODE =
       "Failed to sync template {} extension info to DataNode {}";
   public static final String FAILED_TO_SYNC_TOPIC_META_RESULT_STATUS =
diff --git 
a/iotdb-core/datanode/src/main/i18n/en/org/apache/iotdb/db/i18n/DataNodePipeMessages.java
 
b/iotdb-core/datanode/src/main/i18n/en/org/apache/iotdb/db/i18n/DataNodePipeMessages.java
index 43ccba45cae..096aa07914b 100644
--- 
a/iotdb-core/datanode/src/main/i18n/en/org/apache/iotdb/db/i18n/DataNodePipeMessages.java
+++ 
b/iotdb-core/datanode/src/main/i18n/en/org/apache/iotdb/db/i18n/DataNodePipeMessages.java
@@ -142,6 +142,16 @@ public final class DataNodePipeMessages {
       "Failed to persist progress index to configNode, status: {}";
   public static final String FAILURE_WHEN_REGISTER_PIPE_PLUGIN_SKIP_THIS =
       "Failure when register pipe plugin {}. Skip this plugin and continue 
startup.";
+  public static final String
+      FAILED_TO_REGISTER_PIPE_PLUGIN_BECAUSE_NAME_CONFLICTS_WITH_BUILTIN =
+          "Failed to register PipePlugin %s, because the given PipePlugin name 
is the same as a built-in PipePlugin name.";
+  public static final String
+      FAILED_TO_REGISTER_PIPE_PLUGIN_BECAUSE_INSTANCE_CONSTRUCTION_FAILED =
+          "Failed to register PipePlugin %s(%s), because its instance can not 
be constructed successfully. Exception: %s";
+  public static final String 
FAILED_TO_REGISTER_PIPE_PLUGIN_BECAUSE_JAR_MD5_MISMATCH =
+      "Failed to register PipePlugin %s, because existed md5 of jar file for 
pipe plugin %s is different from the new jar file.";
+  public static final String FAILED_TO_DEREGISTER_BUILTIN_PIPE_PLUGIN =
+      "Failed to deregister builtin PipePlugin %s.";
   public static final String PIPECONNECTOR = "PipeConnector: ";
   public static final String 
PIPEDATANODETASKBUILDER_FAILED_TO_PARSE_INCLUSION_AND_EXCLUSION =
       "PipeDataNodeTaskBuilder failed to parse 'inclusion' and 'exclusion' 
parameters: {}";
@@ -439,8 +449,15 @@ public final class DataNodePipeMessages {
   public static final String FAILED_TO_START_SOURCES = "failed to start 
sources.";
   public static final String HEARTBEAT_EVENT_CAN_NOT_BE_SUPPLIED_BECAUSE =
       "Heartbeat Event {} can not be supplied because the reference count can 
not be increased";
+  public static final String EVENT_CAN_NOT_BE_SUPPLIED_BECAUSE_DATA_IS_LOST =
+      "Event %s can not be supplied because the reference count can not be 
increased, the data represented by this event is lost";
   public static final String INTERRUPTED_WAITING_FOR_PROCESSOR_TO_STOP =
       "Interrupted waiting for processor to stop";
+  public static final String 
INTERRUPTED_WHEN_WAITING_FOR_PARSING_PRIVILEGE_FOR_TSFILE =
+      "Interrupted when waiting for parsing privilege for TsFile %s.";
+  public static final String PARSE_TSFILE_WHEN_CHECKING_PRIVILEGE_ERROR =
+      "Parse TsFile %s when checking privilege error. Because: %s";
+  public static final String READ_TSFILE_ERROR = "Read TsFile %s error.";
   public static final String 
IOTDBSCHEMAREGIONSOURCE_DOES_NOT_SUPPORT_TRANSFERRING_EVENTS_UNDER =
       "IoTDBSchemaRegionSource does not support transferring events under 
simple consensus";
   public static final String NOT_HAS_PRIVILEGE_TO_TRANSFER_EVENT =
@@ -826,6 +843,8 @@ public final class DataNodePipeMessages {
   public static final String REDIRECT_FILE_POSITION_TO = "Redirect file 
position to {}.";
   public static final String REDIRECT_TO_POSITION_IN_TRANSFERRING_TSFILE =
       "Redirect to position {} in transferring tsFile {}.";
+  public static final String NETWORK_FAILED_TO_RECEIVE_TSFILE_STATUS =
+      "Network failed to receive tsFile %s, status: %s";
   public static final String SECURITY_DIR = "security dir: {}";
   public static final String SECURITY_PKI_DIR = "security pki dir: {}";
   public static final String SUCCESSFULLY_ADDED_ITEM = "Successfully added 
item {}.";
diff --git 
a/iotdb-core/datanode/src/main/i18n/zh/org/apache/iotdb/db/i18n/DataNodePipeMessages.java
 
b/iotdb-core/datanode/src/main/i18n/zh/org/apache/iotdb/db/i18n/DataNodePipeMessages.java
index adb27e50901..4d514c19ba5 100644
--- 
a/iotdb-core/datanode/src/main/i18n/zh/org/apache/iotdb/db/i18n/DataNodePipeMessages.java
+++ 
b/iotdb-core/datanode/src/main/i18n/zh/org/apache/iotdb/db/i18n/DataNodePipeMessages.java
@@ -84,7 +84,7 @@ public final class DataNodePipeMessages {
   // ===================== AGENT =====================
 
   public static final String ATTEMPT_TO_REPORT_PIPE_EXCEPTION_TO_A =
-      "Attempt to report pipe exception to a null PipeTaskMeta.";
+      "尝试向空的 PipeTaskMeta 上报 pipe 异常。";
   public static final String CANNOT_PARSE_REBOOT_TIMES_FROM_FILE_SET =
       "无法解析 reboot times from file {}, set the current time in seconds ({}) as 
the reboot times";
   public static final String CANNOT_RECORD_REBOOT_TIMES_TO_FILE_THE =
@@ -92,11 +92,11 @@ public final class DataNodePipeMessages {
   public static final String 
CANNOT_START_SIMPLEPROGRESSINDEXASSIGNER_BECAUSE_OF =
       "无法启动 SimpleProgressIndexAssigner because of {}";
   public static final String CREATE_PIPE_DN_TASK_SUCCESSFULLY_WITHIN_MS =
-      "创建 pipe DN task {} successfully within {} ms";
+      "创建 pipe DN task {} 成功,耗时 {} ms";
   public static final String 
DEREGISTER_SUBTASK_RUNNINGTASKCOUNT_REGISTEREDTASKCOUNT =
-      "Deregister subtask {}. runningTaskCount: {}, registeredTaskCount: {}";
+      "注销子任务 {}。runningTaskCount: {}, registeredTaskCount: {}";
   public static final String DROP_PIPE_DN_TASK_SUCCESSFULLY_WITHIN_MS =
-      "Drop pipe DN task {} successfully within {} ms";
+      "删除 pipe DN task {} 成功,耗时 {} ms";
   public static final String 
ERROR_OCCURRED_WHEN_COLLECTING_EVENTS_FROM_PROCESSOR =
       "collecting events from processor 时发生错误";
   public static final String 
EXCEPTION_IN_PIPE_EVENT_PROCESSING_IGNORED_BECAUSE =
@@ -132,9 +132,19 @@ public final class DataNodePipeMessages {
       "获取 pipe task meta from config node. Ignore the exception 失败,原因:config 
node may not be "
           + "ready yet, and meta will be pushed by config node later.";
   public static final String FAILED_TO_PERSIST_PROGRESS_INDEX_TO_CONFIGNODE =
-      "persist progress index to configNode, status: {} 失败";
+      "持久化 progress index 到 configNode 失败,状态:{}";
   public static final String FAILURE_WHEN_REGISTER_PIPE_PLUGIN_SKIP_THIS =
-      "Failure when register pipe plugin {}. Skip this plugin and continue 
startup.";
+      "注册 pipe plugin {} 失败。将跳过该插件并继续启动。";
+  public static final String
+      FAILED_TO_REGISTER_PIPE_PLUGIN_BECAUSE_NAME_CONFLICTS_WITH_BUILTIN =
+          "注册 PipePlugin %s 失败,因为给定的 PipePlugin 名称与内置 PipePlugin 名称重复。";
+  public static final String
+      FAILED_TO_REGISTER_PIPE_PLUGIN_BECAUSE_INSTANCE_CONSTRUCTION_FAILED =
+          "注册 PipePlugin %s(%s) 失败,因为其实例无法成功构造。异常:%s";
+  public static final String 
FAILED_TO_REGISTER_PIPE_PLUGIN_BECAUSE_JAR_MD5_MISMATCH =
+      "注册 PipePlugin %s 失败,因为 pipe plugin %s 已存在的 jar 文件 MD5 与新的 jar 文件不同。";
+  public static final String FAILED_TO_DEREGISTER_BUILTIN_PIPE_PLUGIN =
+      "注销内置 PipePlugin %s 失败。";
   public static final String PIPECONNECTOR = "PipeConnector: ";
   public static final String 
PIPEDATANODETASKBUILDER_FAILED_TO_PARSE_INCLUSION_AND_EXCLUSION =
       "PipeDataNodeTaskBuilder failed to parse 'inclusion' and 'exclusion' 
parameters: {}";
@@ -157,51 +167,51 @@ public final class DataNodePipeMessages {
       "PipeDataNodeTaskBuilder: When the realtime sync is enabled, we enable 
rate limiter in "
           + "sending tsfile by default to reserve disk and network IO for 
realtime sending.";
   public static final String 
PIPEEVENTCOLLECTOR_THE_EVENT_IS_ALREADY_RELEASED_SKIPPING =
-      "PipeEventCollector: The event {} is already released, skipping it.";
+      "PipeEventCollector:事件 {} 已被释放,跳过处理。";
   public static final String PIPE_CONNECTOR_SUBTASK_WAS_CLOSED_WITHIN_MS =
       "Pipe:connector subtask {} ({}) 已关闭 within {} ms";
-  public static final String PIPE_META_NOT_FOUND = "Pipe meta not found: ";
+  public static final String PIPE_META_NOT_FOUND = "未找到 pipe 元数据:";
   public static final String PIPE_SINK_SUBTASKS_WITH_ATTRIBUTES_IS_BOUNDED =
       "Pipe sink subtasks with attributes {} is bounded with sinkExecutor {} 
and "
           + "callbackExecutor {}.";
   public static final String PIPE_SKIPPING_TEMPORARY_TSFILE_WHICH_SHOULDN_T =
-      "Pipe skipping temporary TsFile which shouldn't be transferred: {}";
+      "Pipe 跳过不应传输的临时 TsFile:{}";
   public static final String PULLED_PIPE_META_FROM_CONFIG_NODE_RECOVERING =
-      "Pulled pipe meta from config node: {}, recovering ...";
+      "已从 config node 拉取 pipe 元数据:{},正在恢复 ...";
   public static final String RECEIVED_PIPE_HEARTBEAT_REQUEST_FROM_CONFIG_NODE =
-      "Received pipe heartbeat request {} from config node.";
+      "收到来自 config node 的 pipe 心跳请求 {}。";
   public static final String 
REGION_NO_TSFILEINSERTIONEVENTS_TO_REPLACE_FOR_SOURCE =
       "Region {}: No TsFileInsertionEvents to replace for source files {}";
   public static final String REGION_REPLACED_TSFILEINSERTIONEVENTS_WITH =
       "Region {}: Replaced TsFileInsertionEvents {} with {}";
-  public static final String REGISTEREDTASKCOUNT_0 = "registeredTaskCount < 0";
-  public static final String REGISTEREDTASKCOUNT_0_1 = "registeredTaskCount <= 
0";
+  public static final String REGISTEREDTASKCOUNT_0 = "registeredTaskCount 小于 
0";
+  public static final String REGISTEREDTASKCOUNT_0_1 = "registeredTaskCount 
小于等于 0";
   public static final String 
REGISTER_SUBTASK_RUNNINGTASKCOUNT_REGISTEREDTASKCOUNT =
-      "Register subtask {}. runningTaskCount: {}, registeredTaskCount: {}";
+      "注册子任务 {}。runningTaskCount: {}, registeredTaskCount: {}";
   public static final String 
REPORT_PIPERUNTIMEEXCEPTION_TO_LOCAL_PIPETASKMETA_EXCEPTION_MESSAGE =
-      "Report PipeRuntimeException to local PipeTaskMeta({}), exception 
message: {}";
-  public static final String RUNNINGTASKCOUNT_0 = "runningTaskCount < 0";
-  public static final String RUNNINGTASKCOUNT_0_1 = "runningTaskCount <= 0";
+      "向本地 PipeTaskMeta({}) 上报 PipeRuntimeException,异常信息:{}";
+  public static final String RUNNINGTASKCOUNT_0 = "runningTaskCount 小于 0";
+  public static final String RUNNINGTASKCOUNT_0_1 = "runningTaskCount 小于等于 0";
   public static final String 
SIMPLEPROGRESSINDEXASSIGNER_STARTED_SUCCESSFULLY_ISSIMPLECONSENSUSENABLE_R =
-      "SimpleProgressIndexAssigner started successfully. 
isSimpleConsensusEnable: {}, "
+      "SimpleProgressIndexAssigner 启动成功。isSimpleConsensusEnable: {}, "
           + "rebootTimes: {}";
   public static final String STARTING_SIMPLEPROGRESSINDEXASSIGNER =
-      "Starting SimpleProgressIndexAssigner ...";
+      "正在启动 SimpleProgressIndexAssigner ...";
   public static final String START_PIPE_DN_TASK_SUCCESSFULLY_WITHIN_MS =
-      "Start pipe DN task {} successfully within {} ms";
+      "启动 pipe DN task {} 成功,耗时 {} ms";
   public static final String 
START_SUBTASK_RUNNINGTASKCOUNT_REGISTEREDTASKCOUNT =
-      "Start subtask {}. runningTaskCount: {}, registeredTaskCount: {}";
+      "启动子任务 {}。runningTaskCount: {}, registeredTaskCount: {}";
   public static final String STOP_PIPE_DN_TASK_SUCCESSFULLY_WITHIN_MS =
-      "Stop pipe DN task {} successfully within {} ms";
+      "停止 pipe DN task {} 成功,耗时 {} ms";
   public static final String STOP_SUBTASK_RUNNINGTASKCOUNT_REGISTEREDTASKCOUNT 
=
-      "Stop subtask {}. runningTaskCount: {}, registeredTaskCount: {}";
+      "停止子任务 {}。runningTaskCount: {}, registeredTaskCount: {}";
   public static final String SUBTASK_IS_CLOSED_IGNORE_EXCEPTION =
       "subtask {} 已关闭, ignore exception";
-  public static final String SUBTASK_WORKER_IS_INTERRUPTED = "subtask worker 
is interrupted";
+  public static final String SUBTASK_WORKER_IS_INTERRUPTED = "子任务工作线程被中断";
   public static final String SUCCESSFULLY_PERSISTED_ALL_PIPE_S_INFO_TO =
       "成功 persisted all pipe's info to configNode。";
   public static final String THE_EXECUTOR_AND_HAS_BEEN_SUCCESSFULLY_SHUTDOWN =
-      "The executor {} and {} has been successfully shutdown.";
+      "执行器 {} 和 {} 已成功关闭。";
 
   // ===================== EVENT =====================
 
@@ -422,9 +432,16 @@ public final class DataNodePipeMessages {
       "加载 snapshot from byteBuffer {} 失败。";
   public static final String FAILED_TO_START_SOURCES = "启动 sources 失败。";
   public static final String HEARTBEAT_EVENT_CAN_NOT_BE_SUPPLIED_BECAUSE =
-      "Heartbeat Event {} can not be supplied because the reference count can 
not be increased";
+      "Heartbeat Event {} 无法被提供,因为其引用计数无法增加";
+  public static final String EVENT_CAN_NOT_BE_SUPPLIED_BECAUSE_DATA_IS_LOST =
+      "Event %s 无法被提供,因为其引用计数无法增加,事件代表的数据已经丢失";
   public static final String INTERRUPTED_WAITING_FOR_PROCESSOR_TO_STOP =
-      "Interrupted waiting for processor to stop";
+      "等待 processor 停止时被中断";
+  public static final String 
INTERRUPTED_WHEN_WAITING_FOR_PARSING_PRIVILEGE_FOR_TSFILE =
+      "等待解析 TsFile %s 的权限信息时被中断。";
+  public static final String PARSE_TSFILE_WHEN_CHECKING_PRIVILEGE_ERROR =
+      "检查权限时解析 TsFile %s 失败。原因:%s";
+  public static final String READ_TSFILE_ERROR = "读取 TsFile %s 失败。";
   public static final String 
IOTDBSCHEMAREGIONSOURCE_DOES_NOT_SUPPORT_TRANSFERRING_EVENTS_UNDER =
       "IoTDBSchemaRegionSource 不支持 transferring events under simple consensus";
   public static final String NOT_HAS_PRIVILEGE_TO_TRANSFER_EVENT = "没有权限 
transfer event: ";
@@ -829,6 +846,8 @@ public final class DataNodePipeMessages {
           + "Peeked event: {}, polled event: {}.";
   public static final String THE_FILE_IS_NOT_FOUND_MAY_ALREADY =
       "The file {} is not found, may already be deleted.";
+  public static final String NETWORK_FAILED_TO_RECEIVE_TSFILE_STATUS =
+      "网络接收 TsFile %s 失败,状态:%s";
   public static final String THE_PIPE_WAS_DROPPED_SO_THE_EVENT =
       "The pipe {} was dropped so the event ack {} will be ignored.";
   public static final String THE_PIPE_WAS_DROPPED_SO_THE_EVENT_1 =
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/PipeDataNodePluginAgent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/PipeDataNodePluginAgent.java
index 70226b35dc7..3eb48b2c04b 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/PipeDataNodePluginAgent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/PipeDataNodePluginAgent.java
@@ -100,8 +100,8 @@ public class PipeDataNodePluginAgent {
     if (information.isBuiltin()) {
       String errorMessage =
           String.format(
-              "Failed to register PipePlugin %s, because "
-                  + "the given PipePlugin name is the same as a built-in 
PipePlugin name.",
+              DataNodePipeMessages
+                  
.FAILED_TO_REGISTER_PIPE_PLUGIN_BECAUSE_NAME_CONFLICTS_WITH_BUILTIN,
               pluginName);
       LOGGER.warn(errorMessage);
       throw new PipeException(errorMessage);
@@ -113,10 +113,9 @@ public class PipeDataNodePluginAgent {
         && 
!PipePluginExecutableManager.getInstance().isLocalJarMatched(pipePluginMeta)) {
       String errMsg =
           String.format(
-              "Failed to register PipePlugin %s, because "
-                  + "existed md5 of jar file for pipe plugin %s "
-                  + "is different from the new jar file.",
-              pluginName, pluginName);
+              
DataNodePipeMessages.FAILED_TO_REGISTER_PIPE_PLUGIN_BECAUSE_JAR_MD5_MISMATCH,
+              pluginName,
+              pluginName);
       LOGGER.warn(errMsg);
       throw new PipeException(errMsg);
     }
@@ -170,9 +169,11 @@ public class PipeDataNodePluginAgent {
         | ClassCastException e) {
       String errorMessage =
           String.format(
-              "Failed to register PipePlugin %s(%s), because "
-                  + "its instance can not be constructed successfully. 
Exception: %s",
-              pluginName.toUpperCase(), className, e);
+              DataNodePipeMessages
+                  
.FAILED_TO_REGISTER_PIPE_PLUGIN_BECAUSE_INSTANCE_CONSTRUCTION_FAILED,
+              pluginName.toUpperCase(),
+              className,
+              e);
       LOGGER.warn(errorMessage, e);
       throw new PipeException(errorMessage);
     }
@@ -210,7 +211,8 @@ public class PipeDataNodePluginAgent {
 
       if (information != null && information.isBuiltin()) {
         String errorMessage =
-            String.format("Failed to deregister builtin PipePlugin %s.", 
pluginName);
+            String.format(
+                DataNodePipeMessages.FAILED_TO_DEREGISTER_BUILTIN_PIPE_PLUGIN, 
pluginName);
         LOGGER.warn(errorMessage);
         throw new PipeException(errorMessage);
       }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
index cce3ac29b37..6d4c3580bd2 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
@@ -610,11 +610,12 @@ public class PipeTsFileInsertionEvent extends 
PipeInsertionEvent
       final String errorMsg =
           e instanceof InterruptedException
               ? String.format(
-                  "Interrupted when waiting for parsing privilege for TsFile 
%s.",
+                  
DataNodePipeMessages.INTERRUPTED_WHEN_WAITING_FOR_PARSING_PRIVILEGE_FOR_TSFILE,
                   resource.getTsFilePath())
               : String.format(
-                  "Parse TsFile %s when checking privilege error. Because: %s",
-                  resource.getTsFilePath(), e.getMessage());
+                  
DataNodePipeMessages.PARSE_TSFILE_WHEN_CHECKING_PRIVILEGE_ERROR,
+                  resource.getTsFilePath(),
+                  e.getMessage());
       LOGGER.warn(errorMsg, e);
       throw new PipeException(errorMsg, e);
     } finally {
@@ -861,7 +862,8 @@ public class PipeTsFileInsertionEvent extends 
PipeInsertionEvent
     } catch (final Exception e) {
       close();
 
-      final String errorMsg = String.format("Read TsFile %s error.", 
tsFile.getPath());
+      final String errorMsg =
+          String.format(DataNodePipeMessages.READ_TSFILE_ERROR, 
tsFile.getPath());
       LOGGER.warn(errorMsg, e);
       throw new PipeException(errorMsg, e);
     }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/client/IoTDBDataNodeAsyncClientManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/client/IoTDBDataNodeAsyncClientManager.java
index 39a5a19a529..5c38c3a8540 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/client/IoTDBDataNodeAsyncClientManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/client/IoTDBDataNodeAsyncClientManager.java
@@ -199,11 +199,17 @@ public class IoTDBDataNodeAsyncClientManager extends 
IoTDBClientManager
         return client;
       }
     } catch (final Exception e) {
-      LOGGER.warn(
-          DataNodePipeMessages.FAILED_TO_BORROW_CLIENT_FOR_CACHED_LEADER,
+      PipeLogger.log(
+          ignored ->
+              LOGGER.warn(
+                  
DataNodePipeMessages.FAILED_TO_BORROW_CLIENT_FOR_CACHED_LEADER,
+                  endPoint.getIp(),
+                  endPoint.getPort(),
+                  e),
+          e,
+          "Failed to borrow client %s:%s for cached leader.",
           endPoint.getIp(),
-          endPoint.getPort(),
-          e);
+          endPoint.getPort());
     }
 
     return borrowClient();
@@ -357,11 +363,17 @@ public class IoTDBDataNodeAsyncClientManager extends 
IoTDBClientManager
           client.close();
           client.invalidateAll();
         } catch (final Exception e) {
-          LOGGER.warn(
-              
DataNodePipeMessages.FAILED_TO_CLOSE_CLIENT_AFTER_HANDSHAKE_FAILURE,
+          PipeLogger.log(
+              ignored ->
+                  LOGGER.warn(
+                      
DataNodePipeMessages.FAILED_TO_CLOSE_CLIENT_AFTER_HANDSHAKE_FAILURE,
+                      targetNodeUrl.getIp(),
+                      targetNodeUrl.getPort(),
+                      e),
+              e,
+              "Failed to close client %s:%s after handshake failure when the 
manager is closed.",
               targetNodeUrl.getIp(),
-              targetNodeUrl.getPort(),
-              e);
+              targetNodeUrl.getPort());
         }
       }
       client.setShouldReturnSelf(true);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/iotconsensusv2/IoTConsensusV2AsyncSink.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/iotconsensusv2/IoTConsensusV2AsyncSink.java
index a4dba48b7e9..b4766a2dae4 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/iotconsensusv2/IoTConsensusV2AsyncSink.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/iotconsensusv2/IoTConsensusV2AsyncSink.java
@@ -29,6 +29,7 @@ import org.apache.iotdb.commons.consensus.index.ProgressIndex;
 import 
org.apache.iotdb.commons.consensus.iotv2.container.IoTV2GlobalComponentContainer;
 import 
org.apache.iotdb.commons.exception.pipe.PipeRuntimeSinkRetryTimesConfigurableException;
 import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
+import org.apache.iotdb.commons.pipe.resource.log.PipeLogger;
 import org.apache.iotdb.commons.pipe.sink.protocol.IoTDBSink;
 import org.apache.iotdb.commons.service.metric.MetricService;
 import org.apache.iotdb.consensus.iotconsensusv2.thrift.TCommitId;
@@ -226,8 +227,13 @@ public class IoTConsensusV2AsyncSink extends IoTDBSink 
implements ConsensusPipeS
           IOTDB_CONFIG.getIotConsensusV2PipelineSize());
     }
     if (transferBuffer.isEmpty()) {
-      LOGGER.info(
-          
DataNodePipeMessages.IOTCONSENSUSV2_CONSENSUSGROUP_TRY_TO_REMOVE_EVENT_AFTER,
+      PipeLogger.log(
+          ignored ->
+              LOGGER.info(
+                  
DataNodePipeMessages.IOTCONSENSUSV2_CONSENSUSGROUP_TRY_TO_REMOVE_EVENT_AFTER,
+                  consensusGroupId,
+                  event),
+          "IoTConsensusV2-ConsensusGroup-%s: try to remove event-%s after 
iotConsensusV2AsyncConnector being closed. Ignore it.",
           consensusGroupId,
           event);
       return;
@@ -240,8 +246,15 @@ public class IoTConsensusV2AsyncSink extends IoTDBSink 
implements ConsensusPipeS
     if (current.equalsInIoTConsensusV2(event)) {
       iterator.remove();
     } else {
-      LOGGER.warn(
-          
DataNodePipeMessages.IOTCONSENSUSV2_CONSENSUSGROUP_EVENT_NOT_FOUND_IN_TRANSFERBUFFER,
+      PipeLogger.log(
+          ignored ->
+              LOGGER.warn(
+                  DataNodePipeMessages
+                      
.IOTCONSENSUSV2_CONSENSUSGROUP_EVENT_NOT_FOUND_IN_TRANSFERBUFFER,
+                  consensusGroupId,
+                  event,
+                  transferBuffer.size()),
+          "IoTConsensusV2-ConsensusGroup-%s: event-%s not found in 
transferBuffer, skip removing. queue size = %s",
           consensusGroupId,
           event,
           transferBuffer.size());
@@ -520,8 +533,14 @@ public class IoTConsensusV2AsyncSink extends IoTDBSink 
implements ConsensusPipeS
         if (System.currentTimeMillis() - retryStartTime > 
TimeUnit.SECONDS.toMillis(20)) {
           // just in case that some events are polled and re-added into queue 
again and again,
           // causing this loop to run forever.
-          LOGGER.warn(
-              
DataNodePipeMessages.IOTCONSENSUSV2_CONSENSUSGROUP_RETRYEVENTQUEUE_IS_NOT_EMPTY_AFTER,
+          PipeLogger.log(
+              ignored ->
+                  LOGGER.warn(
+                      DataNodePipeMessages
+                          
.IOTCONSENSUSV2_CONSENSUSGROUP_RETRYEVENTQUEUE_IS_NOT_EMPTY_AFTER,
+                      consensusGroupId,
+                      retryEventQueue.size()),
+              "IoTConsensusV2-ConsensusGroup-%s: retryEventQueue is not empty 
after 20 seconds. retryQueue size: %s",
               consensusGroupId,
               retryEventQueue.size());
           return;
@@ -535,8 +554,16 @@ public class IoTConsensusV2AsyncSink extends IoTDBSink 
implements ConsensusPipeS
             peekedEvent.getRetryInterval() > 
EnrichedEvent.INITIAL_RETRY_INTERVAL_FOR_IOTV2
                 ? peekedEvent.getRetryInterval()
                 : 0L;
-        LOGGER.info(
-            
DataNodePipeMessages.IOTCONSENSUSV2_CONSENSUSGROUP_RETRY_WITH_INTERVAL_FOR_INDEX,
+        PipeLogger.log(
+            ignored ->
+                LOGGER.info(
+                    DataNodePipeMessages
+                        
.IOTCONSENSUSV2_CONSENSUSGROUP_RETRY_WITH_INTERVAL_FOR_INDEX,
+                    consensusGroupId,
+                    retryInterval,
+                    peekedEvent.getReplicateIndexForIoTV2(),
+                    peekedEvent),
+            "IoTConsensusV2-ConsensusGroup-%s: retry with interval %s for 
index %s %s",
             consensusGroupId,
             retryInterval,
             peekedEvent.getReplicateIndexForIoTV2(),
@@ -553,12 +580,14 @@ public class IoTConsensusV2AsyncSink extends IoTDBSink 
implements ConsensusPipeS
               } else if (peekedEvent instanceof PipeDeleteDataNodeEvent) {
                 retryTransfer((PipeDeleteDataNodeEvent) peekedEvent);
               } else {
-                if (LOGGER.isWarnEnabled()) {
-                  LOGGER.warn(
-                      DataNodePipeMessages
-                          
.IOTCONSENSUSV2ASYNCCONNECTOR_DOES_NOT_SUPPORT_TRANSFER_GENERIC_EVENT,
-                      peekedEvent);
-                }
+                PipeLogger.log(
+                    ignored ->
+                        LOGGER.warn(
+                            DataNodePipeMessages
+                                
.IOTCONSENSUSV2ASYNCCONNECTOR_DOES_NOT_SUPPORT_TRANSFER_GENERIC_EVENT,
+                            peekedEvent),
+                    "IoTConsensusV2AsyncConnector does not support transfer 
generic event: %s.",
+                    peekedEvent);
               }
             },
             retryInterval,
@@ -629,15 +658,28 @@ public class IoTConsensusV2AsyncSink extends IoTDBSink 
implements ConsensusPipeS
 
     boolean res = retryEventQueue.offer(event);
     if (res) {
-      LOGGER.info(
-          DataNodePipeMessages
-              
.IOTCONSENSUSV2_CONSENSUSGROUP_EVENT_REPLICATE_INDEX_TRANSFER_FAILED_1,
+      PipeLogger.log(
+          ignored ->
+              LOGGER.info(
+                  DataNodePipeMessages
+                      
.IOTCONSENSUSV2_CONSENSUSGROUP_EVENT_REPLICATE_INDEX_TRANSFER_FAILED_1,
+                  consensusGroupId,
+                  event,
+                  event.getReplicateIndexForIoTV2()),
+          "IoTConsensusV2-ConsensusGroup-%s: Event %s replicate index %s 
transfer failed, will be added to retry queue.",
           consensusGroupId,
           event,
           event.getReplicateIndexForIoTV2());
     } else {
-      LOGGER.warn(
-          
DataNodePipeMessages.IOTCONSENSUSV2_CONSENSUSGROUP_EVENT_REPLICATE_INDEX_TRANSFER_FAILED,
+      PipeLogger.log(
+          ignored ->
+              LOGGER.warn(
+                  DataNodePipeMessages
+                      
.IOTCONSENSUSV2_CONSENSUSGROUP_EVENT_REPLICATE_INDEX_TRANSFER_FAILED,
+                  consensusGroupId,
+                  event,
+                  event.getReplicateIndexForIoTV2()),
+          "IoTConsensusV2-ConsensusGroup-%s: Event %s replicate index %s 
transfer failed, added to retry queue failed, this event will be ignored.",
           consensusGroupId,
           event,
           event.getReplicateIndexForIoTV2());
@@ -676,14 +718,23 @@ public class IoTConsensusV2AsyncSink extends IoTDBSink 
implements ConsensusPipeS
   private void logOnClientException(
       final AsyncIoTConsensusV2ServiceClient client, final Exception e) {
     if (client == null) {
-      LOGGER.warn(THRIFT_ERROR_FORMATTER_WITHOUT_ENDPOINT, e);
+      PipeLogger.log(
+          ignored -> LOGGER.warn(THRIFT_ERROR_FORMATTER_WITHOUT_ENDPOINT, e),
+          e,
+          THRIFT_ERROR_FORMATTER_WITHOUT_ENDPOINT);
     } else {
-      LOGGER.warn(
-          String.format(
-              THRIFT_ERROR_FORMATTER_WITH_ENDPOINT,
-              client.getTEndpoint().getIp(),
-              client.getTEndpoint().getPort()),
-          e);
+      PipeLogger.log(
+          ignored ->
+              LOGGER.warn(
+                  String.format(
+                      THRIFT_ERROR_FORMATTER_WITH_ENDPOINT,
+                      client.getTEndpoint().getIp(),
+                      client.getTEndpoint().getPort()),
+                  e),
+          e,
+          THRIFT_ERROR_FORMATTER_WITH_ENDPOINT,
+          client.getTEndpoint().getIp(),
+          client.getTEndpoint().getPort());
     }
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/iotconsensusv2/handler/IoTConsensusV2DeleteEventHandler.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/iotconsensusv2/handler/IoTConsensusV2DeleteEventHandler.java
index 35ba6a89f7a..36dfdffbf42 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/iotconsensusv2/handler/IoTConsensusV2DeleteEventHandler.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/iotconsensusv2/handler/IoTConsensusV2DeleteEventHandler.java
@@ -21,6 +21,7 @@ package 
org.apache.iotdb.db.pipe.sink.protocol.iotconsensusv2.handler;
 
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.client.async.AsyncIoTConsensusV2ServiceClient;
+import org.apache.iotdb.commons.pipe.resource.log.PipeLogger;
 import org.apache.iotdb.commons.utils.RetryUtils;
 import 
org.apache.iotdb.consensus.iotconsensusv2.thrift.TIoTConsensusV2TransferReq;
 import 
org.apache.iotdb.consensus.iotconsensusv2.thrift.TIoTConsensusV2TransferResp;
@@ -102,12 +103,19 @@ public class IoTConsensusV2DeleteEventHandler
 
   @Override
   public void onError(Exception e) {
-    LOGGER.warn(
-        
DataNodePipeMessages.FAILED_TO_TRANSFER_PIPEDELETENODEEVENT_COMMITTER_KEY_REPLICATE,
+    PipeLogger.log(
+        ignored ->
+            LOGGER.warn(
+                
DataNodePipeMessages.FAILED_TO_TRANSFER_PIPEDELETENODEEVENT_COMMITTER_KEY_REPLICATE,
+                event.coreReportMessage(),
+                event.getCommitterKey(),
+                event.getReplicateIndexForIoTV2(),
+                e),
+        e,
+        "Failed to transfer PipeDeleteNodeEvent %s (committer key=%s, 
replicate index=%s).",
         event.coreReportMessage(),
         event.getCommitterKey(),
-        event.getReplicateIndexForIoTV2(),
-        e);
+        event.getReplicateIndexForIoTV2());
 
     if (RetryUtils.needRetryWithIncreasingInterval(e)) {
       // just in case for overflow
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/iotconsensusv2/handler/IoTConsensusV2TabletBatchEventHandler.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/iotconsensusv2/handler/IoTConsensusV2TabletBatchEventHandler.java
index b2026c809dd..b107b7fe064 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/iotconsensusv2/handler/IoTConsensusV2TabletBatchEventHandler.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/iotconsensusv2/handler/IoTConsensusV2TabletBatchEventHandler.java
@@ -22,6 +22,7 @@ package 
org.apache.iotdb.db.pipe.sink.protocol.iotconsensusv2.handler;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.client.async.AsyncIoTConsensusV2ServiceClient;
 import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
+import org.apache.iotdb.commons.pipe.resource.log.PipeLogger;
 import 
org.apache.iotdb.consensus.iotconsensusv2.thrift.TIoTConsensusV2BatchTransferReq;
 import 
org.apache.iotdb.consensus.iotconsensusv2.thrift.TIoTConsensusV2BatchTransferResp;
 import 
org.apache.iotdb.consensus.iotconsensusv2.thrift.TIoTConsensusV2TransferResp;
@@ -117,17 +118,26 @@ public class IoTConsensusV2TabletBatchEventHandler
 
   @Override
   public void onError(final Exception exception) {
-    LOGGER.warn(
-        
DataNodePipeMessages.IOTCONSENSUSV2_FAILED_TO_TRANSFER_TABLETINSERTIONEVENT_BATCH_TOTAL,
-        events.size(),
+    final Object pipeNames =
         events.stream()
             .map(
                 event ->
                     event instanceof EnrichedEvent
                         ? ((EnrichedEvent) event).getPipeName()
                         : "UNKNOWN")
-            .collect(Collectors.toSet()),
-        exception);
+            .collect(Collectors.toSet());
+    PipeLogger.log(
+        ignored ->
+            LOGGER.warn(
+                DataNodePipeMessages
+                    
.IOTCONSENSUSV2_FAILED_TO_TRANSFER_TABLETINSERTIONEVENT_BATCH_TOTAL,
+                events.size(),
+                pipeNames,
+                exception),
+        exception,
+        "IoTConsensusV2: Failed to transfer TabletInsertionEvent batch. Total 
failed events: %s, related pipe names: %s",
+        events.size(),
+        pipeNames);
 
     connector.addFailureEventsToRetryQueue(events);
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/iotconsensusv2/handler/IoTConsensusV2TabletInsertionEventHandler.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/iotconsensusv2/handler/IoTConsensusV2TabletInsertionEventHandler.java
index 4c31942692d..a74e334d5e5 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/iotconsensusv2/handler/IoTConsensusV2TabletInsertionEventHandler.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/iotconsensusv2/handler/IoTConsensusV2TabletInsertionEventHandler.java
@@ -22,6 +22,7 @@ package 
org.apache.iotdb.db.pipe.sink.protocol.iotconsensusv2.handler;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.client.async.AsyncIoTConsensusV2ServiceClient;
 import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
+import org.apache.iotdb.commons.pipe.resource.log.PipeLogger;
 import org.apache.iotdb.commons.utils.RetryUtils;
 import 
org.apache.iotdb.consensus.iotconsensusv2.thrift.TIoTConsensusV2TransferReq;
 import 
org.apache.iotdb.consensus.iotconsensusv2.thrift.TIoTConsensusV2TransferResp;
@@ -113,12 +114,20 @@ public abstract class 
IoTConsensusV2TabletInsertionEventHandler<
   @Override
   public void onError(Exception exception) {
     EnrichedEvent event = (EnrichedEvent) this.event;
-    LOGGER.warn(
-        
DataNodePipeMessages.FAILED_TO_TRANSFER_TABLETINSERTIONEVENT_COMMITTER_KEY_REPLICATE,
+    PipeLogger.log(
+        ignored ->
+            LOGGER.warn(
+                DataNodePipeMessages
+                    
.FAILED_TO_TRANSFER_TABLETINSERTIONEVENT_COMMITTER_KEY_REPLICATE,
+                event.coreReportMessage(),
+                event.getCommitterKey(),
+                event.getReplicateIndexForIoTV2(),
+                exception),
+        exception,
+        "Failed to transfer TabletInsertionEvent %s (committer key=%s, 
replicate index=%s).",
         event.coreReportMessage(),
         event.getCommitterKey(),
-        event.getReplicateIndexForIoTV2(),
-        exception);
+        event.getReplicateIndexForIoTV2());
 
     if (RetryUtils.needRetryWithIncreasingInterval(exception)) {
       // just in case for overflow
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/iotconsensusv2/handler/IoTConsensusV2TsFileInsertionEventHandler.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/iotconsensusv2/handler/IoTConsensusV2TsFileInsertionEventHandler.java
index 490806cfddb..4e269aaa7e8 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/iotconsensusv2/handler/IoTConsensusV2TsFileInsertionEventHandler.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/iotconsensusv2/handler/IoTConsensusV2TsFileInsertionEventHandler.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.client.async.AsyncIoTConsensusV2ServiceClient;
 import org.apache.iotdb.commons.pipe.config.PipeConfig;
+import org.apache.iotdb.commons.pipe.resource.log.PipeLogger;
 import 
org.apache.iotdb.commons.pipe.sink.payload.iotconsensusv2.response.IoTConsensusV2TransferFilePieceResp;
 import org.apache.iotdb.commons.utils.RetryUtils;
 import org.apache.iotdb.consensus.iotconsensusv2.thrift.TCommitId;
@@ -290,13 +291,22 @@ public class IoTConsensusV2TsFileInsertionEventHandler
 
   @Override
   public void onError(final Exception exception) {
-    LOGGER.warn(
-        
DataNodePipeMessages.IOTCONSENSUSV2_FAILED_TO_TRANSFER_TSFILEINSERTIONEVENT_COMMITTER_KEY,
+    PipeLogger.log(
+        ignored ->
+            LOGGER.warn(
+                DataNodePipeMessages
+                    
.IOTCONSENSUSV2_FAILED_TO_TRANSFER_TSFILEINSERTIONEVENT_COMMITTER_KEY,
+                consensusPipeName,
+                tsFile,
+                event.getCommitterKey(),
+                event.getReplicateIndexForIoTV2(),
+                exception),
+        exception,
+        "IoTConsensusV2-%s: Failed to transfer TsFileInsertionEvent %s 
(committer key %s, replicate index %s).",
         consensusPipeName,
         tsFile,
         event.getCommitterKey(),
-        event.getReplicateIndexForIoTV2(),
-        exception);
+        event.getReplicateIndexForIoTV2());
 
     if (RetryUtils.needRetryWithIncreasingInterval(exception)) {
       // just in case for overflow
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/legacy/IoTDBLegacyPipeSink.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/legacy/IoTDBLegacyPipeSink.java
index 5ae7942d201..c0a9eb6a79d 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/legacy/IoTDBLegacyPipeSink.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/legacy/IoTDBLegacyPipeSink.java
@@ -441,7 +441,8 @@ public class IoTDBLegacyPipeSink implements PipeConnector {
               
DataNodePipeMessages.REDIRECT_TO_POSITION_IN_TRANSFERRING_TSFILE, position, 
file);
         } else if (status.code == 
TSStatusCode.SYNC_FILE_ERROR.getStatusCode()) {
           final String errorMsg =
-              String.format("Network failed to receive tsFile %s, status: %s", 
file, status);
+              String.format(
+                  
DataNodePipeMessages.NETWORK_FAILED_TO_RECEIVE_TSFILE_STATUS, file, status);
           LOGGER.warn(errorMsg);
           throw new PipeConnectionException(errorMsg);
         }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java
index bd3f06ba778..b8b169b1f6a 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java
@@ -269,7 +269,12 @@ public class IoTDBDataRegionAsyncSink extends IoTDBSink {
                   sealedFile.left));
         }
       } catch (final Exception e) {
-        LOGGER.warn(DataNodePipeMessages.FAILED_TO_TRANSFER_TSFILE_BATCH, 
dbTsFilePairs, e);
+        PipeLogger.log(
+            ignored ->
+                
LOGGER.warn(DataNodePipeMessages.FAILED_TO_TRANSFER_TSFILE_BATCH, 
dbTsFilePairs, e),
+            e,
+            "Failed to transfer tsfile batch (%s).",
+            dbTsFilePairs);
         if (eventsHadBeenAddedToRetryQueue.compareAndSet(false, true)) {
           addFailureEventsToRetryQueue(events, e);
         }
@@ -461,17 +466,27 @@ public class IoTDBDataRegionAsyncSink extends IoTDBSink {
       } catch (final Exception e) {
         if (e instanceof InterruptedException) {
           Thread.currentThread().interrupt();
-          LOGGER.warn(
-              
DataNodePipeMessages.TRANSFER_TSFILE_EVENT_ASYNCHRONOUSLY_WAS_INTERRUPTED,
-              pipeTransferTsFileHandler.getTsFile(),
-              e);
+          PipeLogger.log(
+              ignored ->
+                  LOGGER.warn(
+                      
DataNodePipeMessages.TRANSFER_TSFILE_EVENT_ASYNCHRONOUSLY_WAS_INTERRUPTED,
+                      pipeTransferTsFileHandler.getTsFile(),
+                      e),
+              e,
+              "Transfer tsfile event %s asynchronously was interrupted.",
+              pipeTransferTsFileHandler.getTsFile());
         }
 
         pipeTransferTsFileHandler.onError(e);
-        LOGGER.warn(
-            
DataNodePipeMessages.FAILED_TO_TRANSFER_TSFILE_EVENT_ASYNCHRONOUSLY,
-            pipeTransferTsFileHandler.getTsFile(),
-            e);
+        PipeLogger.log(
+            ignored ->
+                LOGGER.warn(
+                    
DataNodePipeMessages.FAILED_TO_TRANSFER_TSFILE_EVENT_ASYNCHRONOUSLY,
+                    pipeTransferTsFileHandler.getTsFile(),
+                    e),
+            e,
+            "Failed to transfer tsfile event %s asynchronously.",
+            pipeTransferTsFileHandler.getTsFile());
       }
     }
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTrackableHandler.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTrackableHandler.java
index eaf9f0b73e4..d543e736743 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTrackableHandler.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTrackableHandler.java
@@ -21,6 +21,7 @@ package 
org.apache.iotdb.db.pipe.sink.protocol.thrift.async.handler;
 
 import org.apache.iotdb.commons.client.ThriftClient;
 import 
org.apache.iotdb.commons.client.async.AsyncPipeDataTransferServiceClient;
+import org.apache.iotdb.commons.pipe.resource.log.PipeLogger;
 import 
org.apache.iotdb.commons.pipe.sink.payload.thrift.common.PipeTransferSliceReqBuilder;
 import org.apache.iotdb.db.i18n.DataNodePipeMessages;
 import 
org.apache.iotdb.db.pipe.sink.protocol.thrift.async.IoTDBDataRegionAsyncSink;
@@ -106,7 +107,10 @@ public abstract class PipeTransferTrackableHandler
       client.returnSelf(
           (e) -> {
             if (e instanceof IllegalStateException) {
-              
LOGGER.info(DataNodePipeMessages.ILLEGAL_STATE_WHEN_RETURN_THE_CLIENT_TO);
+              PipeLogger.log(
+                  ignored ->
+                      
LOGGER.info(DataNodePipeMessages.ILLEGAL_STATE_WHEN_RETURN_THE_CLIENT_TO),
+                  "Illegal state when return the client to object pool, maybe 
the pool is already cleared. Will ignore.");
               return true;
             }
             return false;
@@ -139,9 +143,9 @@ public abstract class PipeTransferTrackableHandler
       return;
     }
 
-    LOGGER.warn(
-        "The body size of the request is too large. The request will be 
sliced. Origin req: {}-{}. "
-            + "Request body size: {}, threshold: {}",
+    PipeLogger.log(
+        LOGGER::warn,
+        "The body size of the request is too large. The request will be 
sliced. Origin req: %s-%s. Request body size: %s, threshold: %s",
         req.getVersion(),
         req.getType(),
         req.body.limit(),
@@ -242,11 +246,12 @@ public abstract class PipeTransferTrackableHandler
       final TPipeTransferReq originalReq,
       final boolean shouldReturnSelf,
       final Exception exception) {
-    LOGGER.warn(
-        "Failed to transfer slice. Origin req: {}-{}. Retry the whole 
transfer.",
+    PipeLogger.log(
+        LOGGER::warn,
+        exception,
+        "Failed to transfer slice. Origin req: %s-%s. Retry the whole 
transfer.",
         originalReq.getVersion(),
-        originalReq.getType(),
-        exception);
+        originalReq.getType());
 
     try {
       client.setShouldReturnSelf(shouldReturnSelf);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTsFileHandler.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTsFileHandler.java
index 0eb226c4639..3fa5e557a20 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTsFileHandler.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTsFileHandler.java
@@ -161,8 +161,13 @@ public class PipeTransferTsFileHandler extends 
PipeTransferTrackableHandler {
     this.client = client;
 
     if (client == null) {
-      LOGGER.warn(
-          DataNodePipeMessages.CLIENT_HAS_BEEN_RETURNED_TO_THE_POOL,
+      PipeLogger.log(
+          ignored ->
+              LOGGER.warn(
+                  DataNodePipeMessages.CLIENT_HAS_BEEN_RETURNED_TO_THE_POOL,
+                  sink.isClosed() ? "CLOSED" : "NOT CLOSED",
+                  tsFile),
+          "Client has been returned to the pool. Connector is %s. TsFile: %s.",
           sink.isClosed() ? "CLOSED" : "NOT CLOSED",
           tsFile);
       return;
@@ -429,7 +434,10 @@ public class PipeTransferTsFileHandler extends 
PipeTransferTrackableHandler {
     client.returnSelf(
         (e) -> {
           if (e instanceof IllegalStateException) {
-            
LOGGER.info(DataNodePipeMessages.ILLEGAL_STATE_WHEN_RETURN_THE_CLIENT_TO);
+            PipeLogger.log(
+                ignored ->
+                    
LOGGER.info(DataNodePipeMessages.ILLEGAL_STATE_WHEN_RETURN_THE_CLIENT_TO),
+                "Illegal state when return the client to object pool, maybe 
the pool is already cleared. Will ignore.");
             return true;
           }
           return false;
@@ -442,8 +450,13 @@ public class PipeTransferTsFileHandler extends 
PipeTransferTrackableHandler {
       final AsyncPipeDataTransferServiceClient client, final TPipeTransferReq 
req)
       throws TException {
     if (client == null) {
-      LOGGER.warn(
-          DataNodePipeMessages.CLIENT_HAS_BEEN_RETURNED_TO_THE_POOL,
+      PipeLogger.log(
+          ignored ->
+              LOGGER.warn(
+                  DataNodePipeMessages.CLIENT_HAS_BEEN_RETURNED_TO_THE_POOL,
+                  sink.isClosed() ? "CLOSED" : "NOT CLOSED",
+                  tsFile),
+          "Client has been returned to the pool. Connector is %s. TsFile: %s.",
           sink.isClosed() ? "CLOSED" : "NOT CLOSED",
           tsFile);
       return;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionHybridSource.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionHybridSource.java
index 9af44d54578..c219acbc697 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionHybridSource.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionHybridSource.java
@@ -279,9 +279,7 @@ public class PipeRealtimeDataRegionHybridSource extends 
PipeRealtimeDataRegionSo
       // event and report the exception to PipeRuntimeAgent.
       final String errorMessage =
           String.format(
-              "TsFile Event %s can not be supplied because "
-                  + "the reference count can not be increased, "
-                  + "the data represented by this event is lost",
+              
DataNodePipeMessages.EVENT_CAN_NOT_BE_SUPPLIED_BECAUSE_DATA_IS_LOST,
               event.getEvent());
       LOGGER.error(errorMessage);
       PipeDataNodeAgent.runtime()
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionLogSource.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionLogSource.java
index 3d9c81bcb0c..2835da02d83 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionLogSource.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionLogSource.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.pipe.source.dataregion.realtime;
 
 import org.apache.iotdb.commons.exception.pipe.PipeRuntimeNonCriticalException;
 import org.apache.iotdb.commons.pipe.event.ProgressReportEvent;
+import org.apache.iotdb.db.i18n.DataNodePipeMessages;
 import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent;
 import org.apache.iotdb.db.pipe.event.common.deletion.PipeDeleteDataNodeEvent;
 import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent;
@@ -111,9 +112,7 @@ public class PipeRealtimeDataRegionLogSource extends 
PipeRealtimeDataRegionSourc
         // and report the exception to PipeRuntimeAgent.
         final String errorMessage =
             String.format(
-                "Event %s can not be supplied because "
-                    + "the reference count can not be increased, "
-                    + "the data represented by this event is lost",
+                
DataNodePipeMessages.EVENT_CAN_NOT_BE_SUPPLIED_BECAUSE_DATA_IS_LOST,
                 realtimeEvent.getEvent());
         LOGGER.error(errorMessage);
         PipeDataNodeAgent.runtime()
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionSource.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionSource.java
index 772154ba07f..b13b2040016 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionSource.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionSource.java
@@ -504,9 +504,7 @@ public abstract class PipeRealtimeDataRegionSource 
implements PipeExtractor {
     } else {
       // This would not happen, but just in case.
       LOGGER.error(
-          "Heartbeat Event {} can not be supplied because "
-              + "the reference count can not be increased",
-          event.getEvent());
+          DataNodePipeMessages.HEARTBEAT_EVENT_CAN_NOT_BE_SUPPLIED_BECAUSE, 
event.getEvent());
 
       // Do not report exception since the PipeHeartbeatEvent doesn't affect
       // the correction of pipe progress.
@@ -524,9 +522,7 @@ public abstract class PipeRealtimeDataRegionSource 
implements PipeExtractor {
       // event and report the exception to PipeRuntimeAgent.
       final String errorMessage =
           String.format(
-              "Event %s can not be supplied because "
-                  + "the reference count can not be increased, "
-                  + "the data represented by this event is lost",
+              
DataNodePipeMessages.EVENT_CAN_NOT_BE_SUPPLIED_BECAUSE_DATA_IS_LOST,
               event.getEvent());
       LOGGER.error(errorMessage);
       PipeDataNodeAgent.runtime()
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionTsFileSource.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionTsFileSource.java
index d70d93db548..73bef31d85b 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionTsFileSource.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionTsFileSource.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.pipe.source.dataregion.realtime;
 
 import org.apache.iotdb.commons.exception.pipe.PipeRuntimeNonCriticalException;
 import org.apache.iotdb.commons.pipe.event.ProgressReportEvent;
+import org.apache.iotdb.db.i18n.DataNodePipeMessages;
 import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent;
 import org.apache.iotdb.db.pipe.event.common.deletion.PipeDeleteDataNodeEvent;
 import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent;
@@ -96,9 +97,7 @@ public class PipeRealtimeDataRegionTsFileSource extends 
PipeRealtimeDataRegionSo
         // and report the exception to PipeRuntimeAgent.
         final String errorMessage =
             String.format(
-                "Event %s can not be supplied because "
-                    + "the reference count can not be increased, "
-                    + "the data represented by this event is lost",
+                
DataNodePipeMessages.EVENT_CAN_NOT_BE_SUPPLIED_BECAUSE_DATA_IS_LOST,
                 realtimeEvent.getEvent());
         LOGGER.error(errorMessage);
         PipeDataNodeAgent.runtime()
diff --git 
a/iotdb-core/node-commons/src/main/i18n/en/org/apache/iotdb/commons/i18n/PipeMessages.java
 
b/iotdb-core/node-commons/src/main/i18n/en/org/apache/iotdb/commons/i18n/PipeMessages.java
index 482717e8b04..f0086f26898 100644
--- 
a/iotdb-core/node-commons/src/main/i18n/en/org/apache/iotdb/commons/i18n/PipeMessages.java
+++ 
b/iotdb-core/node-commons/src/main/i18n/en/org/apache/iotdb/commons/i18n/PipeMessages.java
@@ -395,6 +395,20 @@ public final class PipeMessages {
           + "because of {}. Will retry forever.";
   public static final String RETRY_EXECUTING_SUBTASK_FOREVER =
       "Retry executing subtask {} (creation time: {}, simple class: {}), retry 
count {}, last exception: {}";
+  public static final String 
PATTERN_INCLUSION_CANNOT_BE_USED_WITH_PATTERN_OR_PATH =
+      "Pipe: %s cannot be used together with %s or %s.";
+  public static final String 
PATTERN_INCLUSION_CANNOT_BE_USED_WITH_PATH_EXCLUSION =
+      "Pipe: %s cannot be used together with %s.";
+  public static final String PATH_AND_PATTERN_CANNOT_BE_USED_TOGETHER =
+      "Pipe: %s and %s cannot be used together.";
+  public static final String PARAMETER_ONLY_SUPPORTS_SINGLE_PATTERN =
+      "Pipe: The parameter %s only supports a single pattern now.";
+  public static final String FAILED_TO_PERFORM_PATTERN_COVERAGE_CHECK =
+      "Pipe: Failed to perform pattern coverage check for inclusion [{}] and 
exclusion [{}].";
+  public static final String EXCLUSION_PATTERN_FULLY_COVERS_INCLUSION_PATTERN =
+      "Pipe: The provided exclusion pattern fully covers the inclusion 
pattern. This pipe pattern will match nothing. Inclusion: [%s], Exclusion: 
[%s]";
+  public static final String EXCLUSION_PATTERN_COVERS_PART_OF_INCLUSION_PATHS =
+      "Pipe: The provided exclusion pattern covers {} out of {} inclusion 
paths. These paths will be excluded. Inclusion: [{}], Exclusion: [{}]";
 
   // ===================== PipeAbstractSinkSubtask =====================
 
diff --git 
a/iotdb-core/node-commons/src/main/i18n/zh/org/apache/iotdb/commons/i18n/PipeMessages.java
 
b/iotdb-core/node-commons/src/main/i18n/zh/org/apache/iotdb/commons/i18n/PipeMessages.java
index eec9a123170..d081a8fd4cf 100644
--- 
a/iotdb-core/node-commons/src/main/i18n/zh/org/apache/iotdb/commons/i18n/PipeMessages.java
+++ 
b/iotdb-core/node-commons/src/main/i18n/zh/org/apache/iotdb/commons/i18n/PipeMessages.java
@@ -379,6 +379,20 @@ public final class PipeMessages {
       "执行子任务 {}(创建时间:{},类名:{})失败,原因:{}。将无限重试。";
   public static final String RETRY_EXECUTING_SUBTASK_FOREVER =
       "重试执行子任务 {}(创建时间:{},类名:{}),重试次数 {},上次异常:{}";
+  public static final String 
PATTERN_INCLUSION_CANNOT_BE_USED_WITH_PATTERN_OR_PATH =
+      "Pipe:%s 不能与 %s 或 %s 同时使用。";
+  public static final String 
PATTERN_INCLUSION_CANNOT_BE_USED_WITH_PATH_EXCLUSION =
+      "Pipe:%s 不能与 %s 同时使用。";
+  public static final String PATH_AND_PATTERN_CANNOT_BE_USED_TOGETHER =
+      "Pipe:%s 和 %s 不能同时使用。";
+  public static final String PARAMETER_ONLY_SUPPORTS_SINGLE_PATTERN =
+      "Pipe:参数 %s 当前只支持单个 pattern。";
+  public static final String FAILED_TO_PERFORM_PATTERN_COVERAGE_CHECK =
+      "Pipe:对 inclusion [{}] 和 exclusion [{}] 执行 pattern 覆盖检查失败。";
+  public static final String EXCLUSION_PATTERN_FULLY_COVERS_INCLUSION_PATTERN =
+      "Pipe:给定 exclusion pattern 完全覆盖了 inclusion pattern。该 pipe pattern 
不会匹配任何内容。Inclusion: [%s], Exclusion: [%s]";
+  public static final String EXCLUSION_PATTERN_COVERS_PART_OF_INCLUSION_PATHS =
+      "Pipe:给定 exclusion pattern 覆盖了 {} / {} 条 inclusion 
路径。这些路径将被排除。Inclusion: [{}], Exclusion: [{}]";
 
   // ===================== PipeAbstractSinkSubtask =====================
 
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/TreePattern.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/TreePattern.java
index 6631f60a65c..48c4c64abaa 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/TreePattern.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/TreePattern.java
@@ -20,6 +20,7 @@
 package org.apache.iotdb.commons.pipe.datastructure.pattern;
 
 import org.apache.iotdb.commons.conf.IoTDBConstant;
+import org.apache.iotdb.commons.i18n.PipeMessages;
 import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.commons.path.PathPatternUtil;
 import org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant;
@@ -180,8 +181,10 @@ public abstract class TreePattern {
     if (hasPatternInclusionKey && (hasLegacyPathKey || hasLegacyPatternKey)) {
       final String msg =
           String.format(
-              "Pipe: %s cannot be used together with %s or %s.",
-              SOURCE_PATTERN_INCLUSION_KEY, SOURCE_PATTERN_KEY, 
SOURCE_PATH_KEY);
+              
PipeMessages.PATTERN_INCLUSION_CANNOT_BE_USED_WITH_PATTERN_OR_PATH,
+              SOURCE_PATTERN_INCLUSION_KEY,
+              SOURCE_PATTERN_KEY,
+              SOURCE_PATH_KEY);
       LOGGER.warn(msg);
       throw new PipeException(msg);
     }
@@ -219,8 +222,9 @@ public abstract class TreePattern {
             EXTRACTOR_PATH_EXCLUSION_KEY, SOURCE_PATH_EXCLUSION_KEY)) {
       final String msg =
           String.format(
-              "Pipe: %s cannot be used together with %s.",
-              SOURCE_PATTERN_INCLUSION_KEY, SOURCE_PATH_EXCLUSION_KEY);
+              
PipeMessages.PATTERN_INCLUSION_CANNOT_BE_USED_WITH_PATH_EXCLUSION,
+              SOURCE_PATTERN_INCLUSION_KEY,
+              SOURCE_PATH_EXCLUSION_KEY);
       LOGGER.warn(msg);
       throw new PipeException(msg);
     }
@@ -256,9 +260,7 @@ public abstract class TreePattern {
     if (inclusionPatterns.isEmpty()) {
       final String msg =
           String.format(
-              "Pipe: The provided exclusion pattern fully covers the inclusion 
pattern. "
-                  + "This pipe pattern will match nothing. "
-                  + "Inclusion: %s, Exclusion: %s",
+              PipeMessages.EXCLUSION_PATTERN_FULLY_COVERS_INCLUSION_PATTERN,
               sourceParameters.getStringByKeys(
                   EXTRACTOR_PATTERN_INCLUSION_KEY,
                   SOURCE_PATTERN_INCLUSION_KEY,
@@ -399,7 +401,8 @@ public abstract class TreePattern {
 
     if (path != null && pattern != null) {
       final String msg =
-          String.format("Pipe: %s and %s cannot be used together.", 
pathKeyName, patternKeyName);
+          String.format(
+              PipeMessages.PATH_AND_PATTERN_CANNOT_BE_USED_TOGETHER, 
pathKeyName, patternKeyName);
       LOGGER.warn(msg);
       throw new PipeException(msg);
     }
@@ -439,7 +442,7 @@ public abstract class TreePattern {
 
     if (!allowMultiple && patterns.size() > 1) {
       final String msg =
-          String.format("Pipe: The parameter %s only supports a single pattern 
now.", parameterKey);
+          String.format(PipeMessages.PARAMETER_ONLY_SUPPORTS_SINGLE_PATTERN, 
parameterKey);
       LOGGER.warn(msg);
       throw new PipeException(msg);
     }
@@ -692,7 +695,7 @@ public abstract class TreePattern {
 
     if (!allowMultiple && patterns.size() > 1) {
       final String msg =
-          String.format("Pipe: The parameter %s only supports a single pattern 
now.", parameterKey);
+          String.format(PipeMessages.PARAMETER_ONLY_SUPPORTS_SINGLE_PATTERN, 
parameterKey);
       LOGGER.warn(msg);
       throw new PipeException(msg);
     }
@@ -854,7 +857,7 @@ public abstract class TreePattern {
     } catch (final Exception e) {
       // This check is best-effort. Do not fail construction.
       LOGGER.warn(
-          "Pipe: Failed to perform pattern coverage check for inclusion [{}] 
and exclusion [{}].",
+          PipeMessages.FAILED_TO_PERFORM_PATTERN_COVERAGE_CHECK,
           inclusion.getPattern(),
           exclusion.getPattern(),
           e);
@@ -866,18 +869,15 @@ public abstract class TreePattern {
       // All inclusion paths are covered by the exclusion
       final String msg =
           String.format(
-              "Pipe: The provided exclusion pattern fully covers the inclusion 
pattern. "
-                  + "This pipe pattern will match nothing. "
-                  + "Inclusion: [%s], Exclusion: [%s]",
-              inclusion.getPattern(), exclusion.getPattern());
+              PipeMessages.EXCLUSION_PATTERN_FULLY_COVERS_INCLUSION_PATTERN,
+              inclusion.getPattern(),
+              exclusion.getPattern());
       LOGGER.warn(msg);
       throw new PipeException(msg);
     } else if (coveredCount > 0) {
       // Some inclusion paths are covered
       LOGGER.warn(
-          "Pipe: The provided exclusion pattern covers {} out of {} inclusion 
paths. "
-              + "These paths will be excluded. "
-              + "Inclusion: [{}], Exclusion: [{}]",
+          PipeMessages.EXCLUSION_PATTERN_COVERS_PART_OF_INCLUSION_PATHS,
           coveredCount,
           inclusionPaths.size(),
           inclusion.getPattern(),

Reply via email to