This is an automated email from the ASF dual-hosted git repository.

jt2594838 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new aa7b275318c Pipe: Optimize pipe logging with shared 
PipePeriodicalLogReducer (#17887)
aa7b275318c is described below

commit aa7b275318c71dc905c1367a40a3931ff0e59e81
Author: Zhenyu Luo <[email protected]>
AuthorDate: Thu Jun 11 17:16:07 2026 +0800

    Pipe: Optimize pipe logging with shared PipePeriodicalLogReducer (#17887)
    
    * Optimize pipe logging with shared PipePeriodicalLogReducer
    
    Move PipePeriodicalLogReducer to node-commons and route ConfigNode and
    DataNode pipe logs through PipeLogger to rate-limit repetitive messages.
    
    * Hot-reload PipePeriodicalLogReducer settings on ConfigNode
    
    Apply pipe log reducer cache updates during ConfigNode configuration reload 
so
    pipe_periodical_log_min_interval_seconds and 
pipe_logger_cache_max_size_in_bytes
    take effect without restart.
    
    Co-authored-by: Cursor <[email protected]>
    
    * Remove redundant log when pipeMetaFromAgent is null in heartbeat parser.
    
    Co-authored-by: Cursor <[email protected]>
    
    * Fix missing i18n for pipe log messages on ConfigNode.
    
    Move hardcoded heartbeat and extractor-close logs into ManagerMessages with 
zh translations.
    
    Co-authored-by: Cursor <[email protected]>
    
    ---------
    
    Co-authored-by: Cursor <[email protected]>
    Co-authored-by: Caideyipi <[email protected]>
---
 .../iotdb/confignode/i18n/ManagerMessages.java     |  8 +-
 .../iotdb/confignode/i18n/ManagerMessages.java     | 14 +++-
 .../confignode/conf/ConfigNodeDescriptor.java      | 10 ++-
 .../agent/runtime/PipeConfigNodeRuntimeAgent.java  | 17 ++--
 .../pipe/agent/task/PipeConfigNodeSubtask.java     | 21 ++---
 .../pipe/coordinator/runtime/PipeMetaSyncer.java   | 26 ++++--
 .../runtime/heartbeat/PipeHeartbeatParser.java     | 43 +++++-----
 .../runtime/heartbeat/PipeHeartbeatScheduler.java  |  7 +-
 .../pipe/coordinator/task/PipeTaskCoordinator.java |  8 +-
 .../coordinator/task/PipeTaskCoordinatorLock.java  | 10 ++-
 .../confignode/persistence/pipe/PipeTaskInfo.java  |  9 ++-
 .../runtime/PipeHandleLeaderChangeProcedure.java   | 36 ++++++---
 .../runtime/PipeHandleMetaChangeProcedure.java     | 33 +++++---
 .../impl/pipe/runtime/PipeMetaSyncProcedure.java   |  6 +-
 .../org/apache/iotdb/db/conf/IoTDBDescriptor.java  |  2 +-
 .../agent/runtime/PipeDataNodeRuntimeAgent.java    | 14 ++--
 .../protocol/thrift/IoTDBDataNodeReceiver.java     |  2 +-
 .../resource/log/PipePeriodicalLogReducer.java     | 93 ----------------------
 .../commons/pipe/resource/log/PipeLogger.java      | 20 +++--
 .../resource/log/PipePeriodicalLogReducer.java     | 76 ++++++++++++++++++
 .../commons/pipe/resource/PipeLoggerTest.java      | 14 ++++
 21 files changed, 287 insertions(+), 182 deletions(-)

diff --git 
a/iotdb-core/confignode/src/main/i18n/en/org/apache/iotdb/confignode/i18n/ManagerMessages.java
 
b/iotdb-core/confignode/src/main/i18n/en/org/apache/iotdb/confignode/i18n/ManagerMessages.java
index 5dbfe151576..d3df427120d 100644
--- 
a/iotdb-core/confignode/src/main/i18n/en/org/apache/iotdb/confignode/i18n/ManagerMessages.java
+++ 
b/iotdb-core/confignode/src/main/i18n/en/org/apache/iotdb/confignode/i18n/ManagerMessages.java
@@ -63,10 +63,16 @@ public final class ManagerMessages {
   public static final String DECREASE_REFERENCE_COUNT_FOR_SNAPSHOT_ERROR =
       "Decrease reference count for snapshot {} error.";
   public static final String DELETING_REGIONS_COSTS_MS = "Deleting regions 
costs {}ms";
+  public static final String 
DETECTED_HISTORICAL_PIPE_COMPLETION_REPORT_FROM_DATANODE =
+      "Detected historical pipe completion report from DataNode {} for pipe 
{}. remainingEventCount: {}, remainingTime: {}, completedDataNodes: {}";
   public static final String DETECTED_COMPLETION_OF_PIPE_STATIC_META_REMOVE_IT 
=
       "Detected completion of pipe {}, static meta: {}, remove it.";
+  public static final String ALL_DATANODES_REPORTED_HISTORICAL_PIPE_COMPLETED =
+      "All DataNodes reported historical pipe {} completed. 
globalRemainingEventCount: {}, globalRemainingTime: {}, staticMeta: {}";
   public static final String 
DETECT_PIPERUNTIMECRITICALEXCEPTION_FROM_AGENT_STOP_PIPE =
       "Detect PipeRuntimeCriticalException {} from agent, stop pipe {}.";
+  public static final String 
DETECT_PIPERUNTIMESINKCRITICALEXCEPTION_FROM_AGENT_STOP_PIPE =
+      "Detect PipeRuntimeSinkCriticalException {} from agent, stop pipe {}.";
   public static final String ENABLE_SEPARATION_OF_POWERS_IS_NOT_SUPPORTED =
       "Enable separation of powers is not supported";
   public static final String ENDEXECUTECQ_TIME_RANGE_IS_CURRENT_TIME_IS =
@@ -113,7 +119,7 @@ public final class ManagerMessages {
   public static final String 
FAILED_TO_CLOSE_CONSUMER_IN_CONSUMER_GROUP_RESULT_STATUS =
       "Failed to close consumer {} in consumer group {}. Result status: {}.";
   public static final String 
FAILED_TO_CLOSE_EXTRACTOR_AFTER_FAILED_TO_INITIALIZE_EXTRACTOR =
-      "Failed to close extractor after failed to initialize extractor. ";
+      "Failed to close extractor after failed to initialize extractor. Ignore 
this exception.";
   public static final String 
FAILED_TO_CLOSE_SINK_AFTER_FAILED_TO_INITIALIZE_IT_IGNORE =
       "Failed to close sink after failed to initialize it. Ignore this 
exception.";
   public static final String FAILED_TO_COLLECT_COMMITCREATETABLEPLAN =
diff --git 
a/iotdb-core/confignode/src/main/i18n/zh/org/apache/iotdb/confignode/i18n/ManagerMessages.java
 
b/iotdb-core/confignode/src/main/i18n/zh/org/apache/iotdb/confignode/i18n/ManagerMessages.java
index e7321e5376c..5164d8e1229 100644
--- 
a/iotdb-core/confignode/src/main/i18n/zh/org/apache/iotdb/confignode/i18n/ManagerMessages.java
+++ 
b/iotdb-core/confignode/src/main/i18n/zh/org/apache/iotdb/confignode/i18n/ManagerMessages.java
@@ -63,10 +63,16 @@ public final class ManagerMessages {
   public static final String DECREASE_REFERENCE_COUNT_FOR_SNAPSHOT_ERROR =
       "Decrease reference count for snapshot {} error.";
   public static final String DELETING_REGIONS_COSTS_MS = "Deleting regions 
costs {}ms";
+  public static final String 
DETECTED_HISTORICAL_PIPE_COMPLETION_REPORT_FROM_DATANODE =
+      "检测到来自 DataNode {} 的历史 pipe 完成上报,pipe {}。remainingEventCount: {}, 
remainingTime: {}, completedDataNodes: {}";
   public static final String DETECTED_COMPLETION_OF_PIPE_STATIC_META_REMOVE_IT 
=
-      "Detected completion of pipe {}, static meta: {}, remove it.";
+      "检测到 pipe {} 已完成,static meta: {},将其移除。";
+  public static final String ALL_DATANODES_REPORTED_HISTORICAL_PIPE_COMPLETED =
+      "所有 DataNode 均已上报历史 pipe {} 完成。globalRemainingEventCount: {}, 
globalRemainingTime: {}, staticMeta: {}";
   public static final String 
DETECT_PIPERUNTIMECRITICALEXCEPTION_FROM_AGENT_STOP_PIPE =
-      "Detect PipeRuntimeCriticalException {} from agent, stop pipe {}.";
+      "检测到 agent 上报 PipeRuntimeCriticalException {},停止 pipe {}。";
+  public static final String 
DETECT_PIPERUNTIMESINKCRITICALEXCEPTION_FROM_AGENT_STOP_PIPE =
+      "检测到 agent 上报 PipeRuntimeSinkCriticalException {},停止 pipe {}。";
   public static final String ENABLE_SEPARATION_OF_POWERS_IS_NOT_SUPPORTED = 
"不支持启用权力分离";
   public static final String ENDEXECUTECQ_TIME_RANGE_IS_CURRENT_TIME_IS =
       "[EndExecuteCQ] {}, time range is [{}, {}), current time is {}";
@@ -112,9 +118,9 @@ public final class ManagerMessages {
   public static final String 
FAILED_TO_CLOSE_CONSUMER_IN_CONSUMER_GROUP_RESULT_STATUS =
       "Failed to close consumer {} in consumer group {}. Result status: {}.";
   public static final String 
FAILED_TO_CLOSE_EXTRACTOR_AFTER_FAILED_TO_INITIALIZE_EXTRACTOR =
-      "Failed to close extractor after failed to initialize extractor. ";
+      "初始化 extractor 失败后关闭 extractor 失败,忽略此异常。";
   public static final String 
FAILED_TO_CLOSE_SINK_AFTER_FAILED_TO_INITIALIZE_IT_IGNORE =
-      "Failed to close sink after failed to initialize it. Ignore this 
exception.";
+      "初始化 sink 失败后关闭 sink 失败,忽略此异常。";
   public static final String FAILED_TO_COLLECT_COMMITCREATETABLEPLAN =
       "Failed to collect CommitCreateTablePlan";
   public static final String 
FAILED_TO_COLLECT_PIPE_META_LIST_FROM_CONFIG_NODE_TASK =
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
index 7321431a12f..c23620f9e75 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
@@ -25,6 +25,8 @@ import org.apache.iotdb.commons.conf.ConfigurationFileUtils;
 import org.apache.iotdb.commons.conf.IoTDBConstant;
 import org.apache.iotdb.commons.conf.TrimProperties;
 import org.apache.iotdb.commons.exception.BadNodeUrlException;
+import org.apache.iotdb.commons.pipe.config.PipeDescriptor;
+import org.apache.iotdb.commons.pipe.resource.log.PipePeriodicalLogReducer;
 import org.apache.iotdb.commons.schema.SchemaConstant;
 import org.apache.iotdb.commons.utils.NodeUrlUtils;
 import org.apache.iotdb.confignode.i18n.ConfigNodeMessages;
@@ -826,12 +828,18 @@ public class ConfigNodeDescriptor {
     }
   }
 
-  public void loadHotModifiedProps(TrimProperties properties) {
+  public void loadHotModifiedProps(TrimProperties properties) throws 
IOException {
     ConfigurationFileUtils.updateAppliedProperties(properties, true);
     Optional.ofNullable(properties.getProperty(IoTDBConstant.CLUSTER_NAME))
         .ifPresent(conf::setClusterName);
     Optional.ofNullable(properties.getProperty("enable_topology_probing"))
         .ifPresent(v -> 
conf.setEnableTopologyProbing(Boolean.parseBoolean(v)));
+    loadPipeHotModifiedProp(properties);
+  }
+
+  private void loadPipeHotModifiedProp(TrimProperties properties) throws 
IOException {
+    PipeDescriptor.loadPipeProps(commonDescriptor.getConfig(), properties, 
true);
+    PipePeriodicalLogReducer.update();
   }
 
   public static ConfigNodeDescriptor getInstance() {
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/runtime/PipeConfigNodeRuntimeAgent.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/runtime/PipeConfigNodeRuntimeAgent.java
index ca98f4ec9e8..4726b50d899 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/runtime/PipeConfigNodeRuntimeAgent.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/runtime/PipeConfigNodeRuntimeAgent.java
@@ -26,6 +26,8 @@ import 
org.apache.iotdb.commons.pipe.agent.runtime.PipePeriodicalPhantomReferenc
 import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta;
 import org.apache.iotdb.commons.pipe.config.PipeConfig;
 import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
+import org.apache.iotdb.commons.pipe.resource.log.PipeLogger;
+import org.apache.iotdb.commons.pipe.resource.log.PipePeriodicalLogReducer;
 import org.apache.iotdb.commons.service.IService;
 import org.apache.iotdb.commons.service.ServiceType;
 import org.apache.iotdb.confignode.i18n.ManagerMessages;
@@ -56,6 +58,7 @@ public class PipeConfigNodeRuntimeAgent implements IService {
   @Override
   public synchronized void start() {
     PipeConfig.getInstance().printAllConfigs();
+    PipeLogger.setLogger(PipePeriodicalLogReducer::log);
 
     // PipeTasks will not be started here and will be started by 
"HandleLeaderChange"
     // procedure when the consensus layer notify leader ready
@@ -142,19 +145,21 @@ public class PipeConfigNodeRuntimeAgent implements 
IService {
     if (event.getPipeTaskMeta() != null) {
       report(event.getPipeTaskMeta(), pipeRuntimeException);
     } else {
-      LOGGER.warn(
-          
ManagerMessages.ATTEMPT_TO_REPORT_PIPE_EXCEPTION_TO_A_NULL_PIPETASKMETA,
-          pipeRuntimeException);
+      PipeLogger.log(
+          LOGGER::warn,
+          pipeRuntimeException,
+          
ManagerMessages.ATTEMPT_TO_REPORT_PIPE_EXCEPTION_TO_A_NULL_PIPETASKMETA);
     }
   }
 
   private void report(
       final PipeTaskMeta pipeTaskMeta, final PipeRuntimeException 
pipeRuntimeException) {
-    LOGGER.warn(
+    PipeLogger.log(
+        LOGGER::warn,
+        pipeRuntimeException,
         
ManagerMessages.REPORT_PIPERUNTIMEEXCEPTION_TO_LOCAL_PIPETASKMETA_EXCEPTION_MESSAGE,
         pipeTaskMeta,
-        pipeRuntimeException.getMessage(),
-        pipeRuntimeException);
+        pipeRuntimeException.getMessage());
 
     pipeTaskMeta.trackExceptionMessage(pipeRuntimeException);
 
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/task/PipeConfigNodeSubtask.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/task/PipeConfigNodeSubtask.java
index fd1a8490630..854f39c22bd 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/task/PipeConfigNodeSubtask.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/task/PipeConfigNodeSubtask.java
@@ -32,6 +32,7 @@ import 
org.apache.iotdb.commons.pipe.config.plugin.env.PipeTaskSinkRuntimeEnviro
 import 
org.apache.iotdb.commons.pipe.config.plugin.env.PipeTaskSourceRuntimeEnvironment;
 import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
 import org.apache.iotdb.commons.pipe.event.ProgressReportEvent;
+import org.apache.iotdb.commons.pipe.resource.log.PipeLogger;
 import org.apache.iotdb.confignode.i18n.ManagerMessages;
 import org.apache.iotdb.confignode.manager.pipe.agent.PipeConfigNodeAgent;
 import 
org.apache.iotdb.confignode.manager.pipe.metric.sink.PipeConfigRegionSinkMetrics;
@@ -105,10 +106,10 @@ public class PipeConfigNodeSubtask extends 
PipeAbstractSinkSubtask {
       try {
         source.close();
       } catch (Exception closeException) {
-        LOGGER.warn(
-            
ManagerMessages.FAILED_TO_CLOSE_EXTRACTOR_AFTER_FAILED_TO_INITIALIZE_EXTRACTOR
-                + "Ignore this exception.",
-            closeException);
+        PipeLogger.log(
+            LOGGER::warn,
+            closeException,
+            
ManagerMessages.FAILED_TO_CLOSE_EXTRACTOR_AFTER_FAILED_TO_INITIALIZE_EXTRACTOR);
       }
       throw e;
     }
@@ -154,9 +155,11 @@ public class PipeConfigNodeSubtask extends 
PipeAbstractSinkSubtask {
       try {
         outputPipeSink.close();
       } catch (final Exception closeException) {
-        LOGGER.warn(
+        PipeLogger.log(
+            LOGGER::warn,
+            closeException,
             
ManagerMessages.FAILED_TO_CLOSE_SINK_AFTER_FAILED_TO_INITIALIZE_IT_IGNORE,
-            closeException);
+            closeException.getMessage());
       }
       throw e;
     }
@@ -208,19 +211,19 @@ public class PipeConfigNodeSubtask extends 
PipeAbstractSinkSubtask {
     try {
       source.close();
     } catch (final Exception e) {
-      LOGGER.info(ManagerMessages.ERROR_OCCURRED_DURING_CLOSING_PIPEEXTRACTOR, 
e);
+      PipeLogger.log(LOGGER::info, e, 
ManagerMessages.ERROR_OCCURRED_DURING_CLOSING_PIPEEXTRACTOR);
     }
 
     try {
       processor.close();
     } catch (final Exception e) {
-      LOGGER.info(ManagerMessages.ERROR_OCCURRED_DURING_CLOSING_PIPEPROCESSOR, 
e);
+      PipeLogger.log(LOGGER::info, e, 
ManagerMessages.ERROR_OCCURRED_DURING_CLOSING_PIPEPROCESSOR);
     }
 
     try {
       outputPipeSink.close();
     } catch (final Exception e) {
-      LOGGER.info(ManagerMessages.ERROR_OCCURRED_DURING_CLOSING_PIPECONNECTOR, 
e);
+      PipeLogger.log(LOGGER::info, e, 
ManagerMessages.ERROR_OCCURRED_DURING_CLOSING_PIPECONNECTOR);
     } finally {
       // Should be after connector.close()
       super.close();
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/PipeMetaSyncer.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/PipeMetaSyncer.java
index 941d6d6d06c..0d348e16417 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/PipeMetaSyncer.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/PipeMetaSyncer.java
@@ -24,6 +24,7 @@ import 
org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
 import org.apache.iotdb.commons.concurrent.ThreadName;
 import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
 import org.apache.iotdb.commons.pipe.config.PipeConfig;
+import org.apache.iotdb.commons.pipe.resource.log.PipeLogger;
 import org.apache.iotdb.confignode.i18n.ManagerMessages;
 import org.apache.iotdb.confignode.manager.ConfigManager;
 import org.apache.iotdb.confignode.manager.ProcedureManager;
@@ -95,7 +96,8 @@ public class PipeMetaSyncer {
     isLastPipeSyncSuccessful = false;
 
     if (configManager.getPipeManager().getPipeTaskCoordinator().isLocked()) {
-      LOGGER.warn(
+      PipeLogger.log(
+          LOGGER::warn,
           
ManagerMessages.PIPETASKCOORDINATORLOCK_IS_HELD_BY_ANOTHER_THREAD_SKIP_THIS_ROUND_OF_2);
       return;
     }
@@ -109,7 +111,9 @@ public class PipeMetaSyncer {
             == 
PipeConfig.getInstance().getPipeMetaSyncerAutoRestartPipeCheckIntervalRound()) {
       somePipesNeedRestarting = autoRestartWithLock();
       if (somePipesNeedRestarting) {
-        
LOGGER.info(ManagerMessages.SOME_PIPES_NEED_RESTARTING_WILL_RESTART_THEM_AFTER_THIS_SYNC);
+        PipeLogger.log(
+            LOGGER::info,
+            
ManagerMessages.SOME_PIPES_NEED_RESTARTING_WILL_RESTART_THEM_AFTER_THIS_SYNC);
       }
       pipeAutoRestartRoundCounter.set(0);
     }
@@ -130,19 +134,22 @@ public class PipeMetaSyncer {
         if (handleMetaChangeStatus.getCode() == 
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
           successfulSync = true;
         } else {
-          LOGGER.warn(
+          PipeLogger.log(
+              LOGGER::warn,
               ManagerMessages.FAILED_TO_HANDLE_PIPE_META_CHANGE_RESULT_STATUS,
               handleMetaChangeStatus);
         }
       }
 
       if (successfulSync) {
-        LOGGER.info(
+        PipeLogger.log(
+            LOGGER::info,
             
ManagerMessages.AFTER_THIS_SUCCESSFUL_SYNC_IF_PIPETASKINFO_IS_EMPTY_DURING_THIS);
         isLastPipeSyncSuccessful = true;
       }
     } else {
-      LOGGER.warn(ManagerMessages.FAILED_TO_SYNC_PIPE_META_RESULT_STATUS, 
metaSyncStatus);
+      PipeLogger.log(
+          LOGGER::warn, 
ManagerMessages.FAILED_TO_SYNC_PIPE_META_RESULT_STATUS, metaSyncStatus);
     }
   }
 
@@ -158,7 +165,8 @@ public class PipeMetaSyncer {
     final AtomicReference<PipeTaskInfo> pipeTaskInfo =
         configManager.getPipeManager().getPipeTaskCoordinator().tryLock();
     if (pipeTaskInfo == null) {
-      
LOGGER.warn(ManagerMessages.FAILED_TO_ACQUIRE_PIPE_LOCK_FOR_AUTO_RESTART_PIPE_TASK);
+      PipeLogger.log(
+          LOGGER::warn, 
ManagerMessages.FAILED_TO_ACQUIRE_PIPE_LOCK_FOR_AUTO_RESTART_PIPE_TASK);
       return false;
     }
     try {
@@ -176,7 +184,7 @@ public class PipeMetaSyncer {
           .getRegionMaintainHandler()
           .checkAndRepairConsensusPipes();
     } catch (Exception e) {
-      LOGGER.warn(ManagerMessages.FAILED_TO_CHECK_AND_REPAIR_CONSENSUS_PIPES, 
e);
+      PipeLogger.log(LOGGER::warn, e, 
ManagerMessages.FAILED_TO_CHECK_AND_REPAIR_CONSENSUS_PIPES);
     }
   }
 
@@ -184,7 +192,9 @@ public class PipeMetaSyncer {
     final AtomicReference<PipeTaskInfo> pipeTaskInfo =
         configManager.getPipeManager().getPipeTaskCoordinator().tryLock();
     if (pipeTaskInfo == null) {
-      
LOGGER.warn(ManagerMessages.FAILED_TO_ACQUIRE_PIPE_LOCK_FOR_HANDLING_SUCCESSFUL_RESTART);
+      PipeLogger.log(
+          LOGGER::warn,
+          
ManagerMessages.FAILED_TO_ACQUIRE_PIPE_LOCK_FOR_HANDLING_SUCCESSFUL_RESTART);
       return false;
     }
     try {
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeatParser.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeatParser.java
index 597120ffe1c..97c6795daf4 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeatParser.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeatParser.java
@@ -30,6 +30,7 @@ import 
org.apache.iotdb.commons.pipe.agent.task.meta.PipeStatus;
 import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta;
 import 
org.apache.iotdb.commons.pipe.agent.task.meta.PipeTemporaryMetaInCoordinator;
 import org.apache.iotdb.commons.pipe.config.PipeConfig;
+import org.apache.iotdb.commons.pipe.resource.log.PipeLogger;
 import org.apache.iotdb.confignode.consensus.response.pipe.task.PipeTableResp;
 import org.apache.iotdb.confignode.i18n.ManagerMessages;
 import org.apache.iotdb.confignode.manager.ConfigManager;
@@ -93,7 +94,8 @@ public class PipeHeartbeatParser {
               final AtomicReference<PipeTaskInfo> pipeTaskInfo =
                   
configManager.getPipeManager().getPipeTaskCoordinator().tryLock();
               if (pipeTaskInfo == null) {
-                LOGGER.warn(
+                PipeLogger.log(
+                    LOGGER::warn,
                     
ManagerMessages.FAILED_TO_ACQUIRE_LOCK_WHEN_PARSEHEARTBEAT_FROM_NODE_ID,
                     nodeId);
                 return;
@@ -127,8 +129,10 @@ public class PipeHeartbeatParser {
         configManager.getNodeManager().getRegisteredDataNodeCount()
             + (PipeConfig.getInstance().isSeperatedPipeHeartbeatEnabled() ? 1 
: 0);
     if (expectedNodeCount <= 0) {
-      LOGGER.warn(
-          ManagerMessages.EXPECTED_PIPE_HEARTBEAT_NODE_COUNT_IS_FALLBACK_TO_1, 
expectedNodeCount);
+      PipeLogger.log(
+          LOGGER::warn,
+          ManagerMessages.EXPECTED_PIPE_HEARTBEAT_NODE_COUNT_IS_FALLBACK_TO_1,
+          expectedNodeCount);
       return 1;
     }
     return expectedNodeCount;
@@ -142,10 +146,6 @@ public class PipeHeartbeatParser {
       final PipeStaticMeta staticMeta = 
pipeMetaFromCoordinator.getStaticMeta();
       final PipeMeta pipeMetaFromAgent = pipeHeartbeat.getPipeMeta(staticMeta);
       if (pipeMetaFromAgent == null) {
-        LOGGER.info(
-            
ManagerMessages.PIPERUNTIMECOORDINATOR_MEETS_ERROR_IN_UPDATING_PIPEMETAKEEPER
-                + "pipeMetaFromAgent is null, pipeMetaFromCoordinator: {}",
-            pipeMetaFromCoordinator);
         continue;
       }
 
@@ -157,8 +157,9 @@ public class PipeHeartbeatParser {
       if (Boolean.TRUE.equals(isPipeCompletedFromAgent)) {
 
         temporaryMeta.markDataNodeCompleted(nodeId);
-        LOGGER.info(
-            "Detected historical pipe completion report from DataNode {} for 
pipe {}. remainingEventCount: {}, remainingTime: {}, completedDataNodes: {}",
+        PipeLogger.log(
+            LOGGER::info,
+            
ManagerMessages.DETECTED_HISTORICAL_PIPE_COMPLETION_REPORT_FROM_DATANODE,
             nodeId,
             staticMeta.getPipeName(),
             pipeHeartbeat.getRemainingEventCount(staticMeta),
@@ -169,14 +170,16 @@ public class PipeHeartbeatParser {
             
configManager.getNodeManager().getRegisteredDataNodeLocations().keySet();
         
uncompletedDataNodeIds.removeAll(temporaryMeta.getCompletedDataNodeIds());
         if (uncompletedDataNodeIds.isEmpty()) {
-          LOGGER.info(
-              "All DataNodes reported historical pipe {} completed. 
globalRemainingEventCount: {}, globalRemainingTime: {}, staticMeta: {}",
+          PipeLogger.log(
+              LOGGER::info,
+              ManagerMessages.ALL_DATANODES_REPORTED_HISTORICAL_PIPE_COMPLETED,
               staticMeta.getPipeName(),
               temporaryMeta.getGlobalRemainingEvents(),
               temporaryMeta.getGlobalRemainingTime(),
               staticMeta);
           pipeTaskInfo.get().removePipeMeta(staticMeta.getPipeName());
-          LOGGER.info(
+          PipeLogger.log(
+              LOGGER::info,
               
ManagerMessages.DETECTED_COMPLETION_OF_PIPE_STATIC_META_REMOVE_IT,
               staticMeta.getPipeName(),
               staticMeta);
@@ -267,7 +270,9 @@ public class PipeHeartbeatParser {
               needWriteConsensusOnConfigNodes.set(true);
               needPushPipeMetaToDataNodes.set(false);
 
-              LOGGER.warn(
+              PipeLogger.log(
+                  LOGGER::warn,
+                  exception,
                   
ManagerMessages.DETECT_PIPERUNTIMECRITICALEXCEPTION_FROM_AGENT_STOP_PIPE,
                   exception,
                   pipeName);
@@ -296,11 +301,13 @@ public class PipeHeartbeatParser {
                             needWriteConsensusOnConfigNodes.set(true);
                             needPushPipeMetaToDataNodes.set(false);
 
-                            LOGGER.warn(
-                                String.format(
-                                    "Detect 
PipeRuntimeConnectorCriticalException %s "
-                                        + "from agent, stop pipe %s.",
-                                    exception, pipeName));
+                            PipeLogger.log(
+                                LOGGER::warn,
+                                exception,
+                                ManagerMessages
+                                    
.DETECT_PIPERUNTIMESINKCRITICALEXCEPTION_FROM_AGENT_STOP_PIPE,
+                                exception,
+                                pipeName);
                           });
             }
           }
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeatScheduler.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeatScheduler.java
index bbdaee8e12c..91d36f4e441 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeatScheduler.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeatScheduler.java
@@ -25,6 +25,7 @@ import 
org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
 import org.apache.iotdb.commons.concurrent.ThreadName;
 import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
 import org.apache.iotdb.commons.pipe.config.PipeConfig;
+import org.apache.iotdb.commons.pipe.resource.log.PipeLogger;
 import org.apache.iotdb.confignode.client.async.CnToDnAsyncRequestType;
 import 
org.apache.iotdb.confignode.client.async.CnToDnInternalServiceAsyncRequestManager;
 import 
org.apache.iotdb.confignode.client.async.handlers.DataNodeAsyncRequestContext;
@@ -85,7 +86,8 @@ public class PipeHeartbeatScheduler {
     }
 
     if (configManager.getPipeManager().getPipeTaskCoordinator().isLocked()) {
-      LOGGER.warn(
+      PipeLogger.log(
+          LOGGER::warn,
           
ManagerMessages.PIPETASKCOORDINATORLOCK_IS_HELD_BY_ANOTHER_THREAD_SKIP_THIS_ROUND_OF);
       return;
     }
@@ -130,7 +132,8 @@ public class PipeHeartbeatScheduler {
               configNodeResp.getPipeRemainingEventCountList(),
               configNodeResp.getPipeRemainingTimeList()));
     } catch (final Exception e) {
-      
LOGGER.warn(ManagerMessages.FAILED_TO_COLLECT_PIPE_META_LIST_FROM_CONFIG_NODE_TASK,
 e);
+      PipeLogger.log(
+          LOGGER::warn, e, 
ManagerMessages.FAILED_TO_COLLECT_PIPE_META_LIST_FROM_CONFIG_NODE_TASK);
     }
   }
 
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinator.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinator.java
index 1d4d10006b6..490c3f03892 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinator.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinator.java
@@ -22,6 +22,7 @@ package 
org.apache.iotdb.confignode.manager.pipe.coordinator.task;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.pipe.agent.task.meta.PipeStaticMeta;
 import org.apache.iotdb.commons.pipe.agent.task.meta.PipeStatus;
+import org.apache.iotdb.commons.pipe.resource.log.PipeLogger;
 import 
org.apache.iotdb.confignode.consensus.request.read.pipe.task.ShowPipePlanV2;
 import org.apache.iotdb.confignode.consensus.response.pipe.task.PipeTableResp;
 import org.apache.iotdb.confignode.i18n.ConfigNodeMessages;
@@ -215,7 +216,10 @@ public class PipeTaskCoordinator {
           .filter(req.whereClause, req.pipeName, req.isTableModel, 
req.userName)
           .convertToTShowPipeResp();
     } catch (final ConsensusException e) {
-      
LOGGER.warn(ConfigNodeMessages.FAILED_IN_THE_READ_API_EXECUTING_THE_CONSENSUS_LAYER_DUE,
 e);
+      PipeLogger.log(
+          LOGGER::warn,
+          e,
+          
ConfigNodeMessages.FAILED_IN_THE_READ_API_EXECUTING_THE_CONSENSUS_LAYER_DUE);
       final TSStatus res = new 
TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode());
       res.setMessage(e.getMessage());
       return new PipeTableResp(res, 
Collections.emptyList()).convertToTShowPipeResp();
@@ -227,7 +231,7 @@ public class PipeTaskCoordinator {
       return ((PipeTableResp) configManager.getConsensusManager().read(new 
ShowPipePlanV2()))
           .convertToTGetAllPipeInfoResp();
     } catch (IOException | ConsensusException e) {
-      LOGGER.warn(ManagerMessages.FAILED_TO_GET_ALL_PIPE_INFO, e);
+      PipeLogger.log(LOGGER::warn, e, 
ManagerMessages.FAILED_TO_GET_ALL_PIPE_INFO);
       return new TGetAllPipeInfoResp(
           new 
TSStatus(TSStatusCode.PIPE_ERROR.getStatusCode()).setMessage(e.getMessage()),
           Collections.emptyList());
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinatorLock.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinatorLock.java
index 0f788435394..15991cfc90e 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinatorLock.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinatorLock.java
@@ -19,6 +19,7 @@
 
 package org.apache.iotdb.confignode.manager.pipe.coordinator.task;
 
+import org.apache.iotdb.commons.pipe.resource.log.PipeLogger;
 import org.apache.iotdb.confignode.i18n.ManagerMessages;
 
 import org.slf4j.Logger;
@@ -53,7 +54,8 @@ public class PipeTaskCoordinatorLock {
           Thread.currentThread().getName());
     } catch (final InterruptedException e) {
       Thread.currentThread().interrupt();
-      LOGGER.error(
+      PipeLogger.log(
+          LOGGER::error,
           
ManagerMessages.INTERRUPTED_WHILE_WAITING_FOR_PIPETASKCOORDINATOR_LOCK_CURRENT_THREAD,
           Thread.currentThread().getName());
     }
@@ -70,14 +72,16 @@ public class PipeTaskCoordinatorLock {
             Thread.currentThread().getName());
         return true;
       } else {
-        LOGGER.info(
+        PipeLogger.log(
+            LOGGER::info,
             
ManagerMessages.PIPETASKCOORDINATOR_LOCK_FAILED_TO_ACQUIRE_BY_THREAD_BECAUSE_OF_TIMEOUT,
             Thread.currentThread().getName());
         return false;
       }
     } catch (InterruptedException e) {
       Thread.currentThread().interrupt();
-      LOGGER.error(
+      PipeLogger.log(
+          LOGGER::error,
           
ManagerMessages.INTERRUPTED_WHILE_WAITING_FOR_PIPETASKCOORDINATOR_LOCK_CURRENT_THREAD,
           Thread.currentThread().getName());
       return false;
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
index d4117080353..3c2e331b0a1 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
@@ -39,6 +39,7 @@ import org.apache.iotdb.commons.pipe.config.PipeConfig;
 import org.apache.iotdb.commons.pipe.config.constant.PipeProcessorConstant;
 import org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant;
 import org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant;
+import org.apache.iotdb.commons.pipe.resource.log.PipeLogger;
 import org.apache.iotdb.commons.snapshot.SnapshotProcessor;
 import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlan;
 import 
org.apache.iotdb.confignode.consensus.request.write.pipe.runtime.PipeHandleLeaderChangePlan;
@@ -636,7 +637,8 @@ public class PipeTaskInfo implements SnapshotProcessor {
                             // external source pipe tasks are not balanced 
here since non-leaders
                             // don't know about RegionLeader Map and will be 
balanced in the meta
                             // sync procedure
-                            LOGGER.info(
+                            PipeLogger.log(
+                                LOGGER::info,
                                 
ConfigNodeMessages.PIPE_IS_USING_EXTERNAL_SOURCE_SKIP_REGION,
                                 pipeMeta.getStaticMeta().getPipeName(),
                                 plan.getConsensusGroupId2NewLeaderIdMap());
@@ -905,7 +907,10 @@ public class PipeTaskInfo implements SnapshotProcessor {
             });
 
     if (needRestart.get()) {
-      
LOGGER.info(ConfigNodeMessages.PIPEMETASYNCER_IS_TRYING_TO_RESTART_THE_PIPES, 
pipeToRestart);
+      PipeLogger.log(
+          LOGGER::info,
+          ConfigNodeMessages.PIPEMETASYNCER_IS_TRYING_TO_RESTART_THE_PIPES,
+          pipeToRestart);
     }
     return needRestart.get();
   }
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleLeaderChangeProcedure.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleLeaderChangeProcedure.java
index 2c5b9566fce..93653923375 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleLeaderChangeProcedure.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleLeaderChangeProcedure.java
@@ -22,6 +22,7 @@ package 
org.apache.iotdb.confignode.procedure.impl.pipe.runtime;
 import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
 import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.pipe.resource.log.PipeLogger;
 import 
org.apache.iotdb.confignode.consensus.request.write.pipe.runtime.PipeHandleLeaderChangePlan;
 import org.apache.iotdb.confignode.i18n.ConfigNodeMessages;
 import org.apache.iotdb.confignode.i18n.ProcedureMessages;
@@ -70,7 +71,8 @@ public class PipeHandleLeaderChangeProcedure extends 
AbstractOperatePipeProcedur
 
   @Override
   public boolean executeFromValidateTask(ConfigNodeProcedureEnv env) {
-    
LOGGER.info(ProcedureMessages.PIPEHANDLELEADERCHANGEPROCEDURE_EXECUTEFROMVALIDATETASK);
+    PipeLogger.log(
+        LOGGER::info, 
ProcedureMessages.PIPEHANDLELEADERCHANGEPROCEDURE_EXECUTEFROMVALIDATETASK);
 
     // Nothing needs to be checked
     return true;
@@ -78,14 +80,18 @@ public class PipeHandleLeaderChangeProcedure extends 
AbstractOperatePipeProcedur
 
   @Override
   public void executeFromCalculateInfoForTask(ConfigNodeProcedureEnv env) {
-    
LOGGER.info(ProcedureMessages.PIPEHANDLELEADERCHANGEPROCEDURE_EXECUTEFROMCALCULATEINFOFORTASK);
+    PipeLogger.log(
+        LOGGER::info,
+        
ProcedureMessages.PIPEHANDLELEADERCHANGEPROCEDURE_EXECUTEFROMCALCULATEINFOFORTASK);
 
     // Nothing needs to be calculated
   }
 
   @Override
   public void executeFromWriteConfigNodeConsensus(ConfigNodeProcedureEnv env) {
-    
LOGGER.info(ProcedureMessages.PIPEHANDLELEADERCHANGEPROCEDURE_EXECUTEFROMHANDLEONCONFIGNODES);
+    PipeLogger.log(
+        LOGGER::info,
+        
ProcedureMessages.PIPEHANDLELEADERCHANGEPROCEDURE_EXECUTEFROMHANDLEONCONFIGNODES);
 
     final Map<TConsensusGroupId, Integer> 
newConsensusGroupIdToLeaderConsensusIdMap =
         new HashMap<>();
@@ -100,7 +106,10 @@ public class PipeHandleLeaderChangeProcedure extends 
AbstractOperatePipeProcedur
     try {
       response = 
env.getConfigManager().getConsensusManager().write(pipeHandleLeaderChangePlan);
     } catch (ConsensusException e) {
-      
LOGGER.warn(ConfigNodeMessages.FAILED_IN_THE_WRITE_API_EXECUTING_THE_CONSENSUS_LAYER_DUE,
 e);
+      PipeLogger.log(
+          LOGGER::warn,
+          e,
+          
ConfigNodeMessages.FAILED_IN_THE_WRITE_API_EXECUTING_THE_CONSENSUS_LAYER_DUE);
       response = new 
TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode());
       response.setMessage(e.getMessage());
     }
@@ -111,35 +120,44 @@ public class PipeHandleLeaderChangeProcedure extends 
AbstractOperatePipeProcedur
 
   @Override
   public void executeFromOperateOnDataNodes(ConfigNodeProcedureEnv env) {
-    
LOGGER.info(ProcedureMessages.PIPEHANDLELEADERCHANGEPROCEDURE_EXECUTEFROMHANDLEONDATANODES);
+    PipeLogger.log(
+        LOGGER::info,
+        
ProcedureMessages.PIPEHANDLELEADERCHANGEPROCEDURE_EXECUTEFROMHANDLEONDATANODES);
 
     pushPipeMetaToDataNodesIgnoreException(env);
   }
 
   @Override
   public void rollbackFromValidateTask(ConfigNodeProcedureEnv env) {
-    
LOGGER.info(ProcedureMessages.PIPEHANDLELEADERCHANGEPROCEDURE_ROLLBACKFROMVALIDATETASK);
+    PipeLogger.log(
+        LOGGER::info, 
ProcedureMessages.PIPEHANDLELEADERCHANGEPROCEDURE_ROLLBACKFROMVALIDATETASK);
 
     // Nothing to do
   }
 
   @Override
   public void rollbackFromCalculateInfoForTask(ConfigNodeProcedureEnv env) {
-    
LOGGER.info(ProcedureMessages.PIPEHANDLELEADERCHANGEPROCEDURE_ROLLBACKFROMCALCULATEINFOFORTASK);
+    PipeLogger.log(
+        LOGGER::info,
+        
ProcedureMessages.PIPEHANDLELEADERCHANGEPROCEDURE_ROLLBACKFROMCALCULATEINFOFORTASK);
 
     // Nothing to do
   }
 
   @Override
   public void rollbackFromWriteConfigNodeConsensus(ConfigNodeProcedureEnv env) 
{
-    
LOGGER.info(ProcedureMessages.PIPEHANDLELEADERCHANGEPROCEDURE_ROLLBACKFROMHANDLEONCONFIGNODES);
+    PipeLogger.log(
+        LOGGER::info,
+        
ProcedureMessages.PIPEHANDLELEADERCHANGEPROCEDURE_ROLLBACKFROMHANDLEONCONFIGNODES);
 
     // Nothing to do
   }
 
   @Override
   public void rollbackFromOperateOnDataNodes(ConfigNodeProcedureEnv env) {
-    
LOGGER.info(ProcedureMessages.PIPEHANDLELEADERCHANGEPROCEDURE_ROLLBACKFROMCREATEONDATANODES);
+    PipeLogger.log(
+        LOGGER::info,
+        
ProcedureMessages.PIPEHANDLELEADERCHANGEPROCEDURE_ROLLBACKFROMCREATEONDATANODES);
 
     // Nothing to do
   }
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleMetaChangeProcedure.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleMetaChangeProcedure.java
index a0dec367352..394f7e2fa9f 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleMetaChangeProcedure.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleMetaChangeProcedure.java
@@ -21,6 +21,7 @@ package 
org.apache.iotdb.confignode.procedure.impl.pipe.runtime;
 
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.pipe.agent.task.meta.PipeMeta;
+import org.apache.iotdb.commons.pipe.resource.log.PipeLogger;
 import 
org.apache.iotdb.confignode.consensus.request.write.pipe.runtime.PipeHandleMetaChangePlan;
 import org.apache.iotdb.confignode.i18n.ConfigNodeMessages;
 import org.apache.iotdb.confignode.i18n.ProcedureMessages;
@@ -80,7 +81,8 @@ public class PipeHandleMetaChangeProcedure extends 
AbstractOperatePipeProcedureV
 
   @Override
   public boolean executeFromValidateTask(ConfigNodeProcedureEnv env) {
-    
LOGGER.info(ProcedureMessages.PIPEHANDLEMETACHANGEPROCEDURE_EXECUTEFROMVALIDATETASK);
+    PipeLogger.log(
+        LOGGER::info, 
ProcedureMessages.PIPEHANDLEMETACHANGEPROCEDURE_EXECUTEFROMVALIDATETASK);
 
     // Do nothing
     return true;
@@ -88,14 +90,17 @@ public class PipeHandleMetaChangeProcedure extends 
AbstractOperatePipeProcedureV
 
   @Override
   public void executeFromCalculateInfoForTask(ConfigNodeProcedureEnv env) {
-    
LOGGER.info(ProcedureMessages.PIPEHANDLEMETACHANGEPROCEDURE_EXECUTEFROMCALCULATEINFOFORTASK);
+    PipeLogger.log(
+        LOGGER::info,
+        
ProcedureMessages.PIPEHANDLEMETACHANGEPROCEDURE_EXECUTEFROMCALCULATEINFOFORTASK);
 
     // Do nothing
   }
 
   @Override
   public void executeFromWriteConfigNodeConsensus(ConfigNodeProcedureEnv env) {
-    LOGGER.info(
+    PipeLogger.log(
+        LOGGER::info,
         
ProcedureMessages.PIPEHANDLEMETACHANGEPROCEDURE_EXECUTEFROMWRITECONFIGNODECONSENSUS);
 
     if (!needWriteConsensusOnConfigNodes) {
@@ -114,7 +119,10 @@ public class PipeHandleMetaChangeProcedure extends 
AbstractOperatePipeProcedureV
               .getConsensusManager()
               .write(new PipeHandleMetaChangePlan(pipeMetaList));
     } catch (ConsensusException e) {
-      
LOGGER.warn(ConfigNodeMessages.FAILED_IN_THE_WRITE_API_EXECUTING_THE_CONSENSUS_LAYER_DUE,
 e);
+      PipeLogger.log(
+          LOGGER::warn,
+          e,
+          
ConfigNodeMessages.FAILED_IN_THE_WRITE_API_EXECUTING_THE_CONSENSUS_LAYER_DUE);
       response = new 
TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode());
       response.setMessage(e.getMessage());
     }
@@ -125,7 +133,8 @@ public class PipeHandleMetaChangeProcedure extends 
AbstractOperatePipeProcedureV
 
   @Override
   public void executeFromOperateOnDataNodes(ConfigNodeProcedureEnv env) {
-    
LOGGER.info(ProcedureMessages.PIPEHANDLEMETACHANGEPROCEDURE_EXECUTEFROMHANDLEONDATANODES);
+    PipeLogger.log(
+        LOGGER::info, 
ProcedureMessages.PIPEHANDLEMETACHANGEPROCEDURE_EXECUTEFROMHANDLEONDATANODES);
 
     if (!needPushPipeMetaToDataNodes) {
       return;
@@ -136,21 +145,25 @@ public class PipeHandleMetaChangeProcedure extends 
AbstractOperatePipeProcedureV
 
   @Override
   public void rollbackFromValidateTask(ConfigNodeProcedureEnv env) {
-    
LOGGER.info(ProcedureMessages.PIPEHANDLEMETACHANGEPROCEDURE_ROLLBACKFROMVALIDATETASK);
+    PipeLogger.log(
+        LOGGER::info, 
ProcedureMessages.PIPEHANDLEMETACHANGEPROCEDURE_ROLLBACKFROMVALIDATETASK);
 
     // Do nothing
   }
 
   @Override
   public void rollbackFromCalculateInfoForTask(ConfigNodeProcedureEnv env) {
-    
LOGGER.info(ProcedureMessages.PIPEHANDLEMETACHANGEPROCEDURE_ROLLBACKFROMCALCULATEINFOFORTASK);
+    PipeLogger.log(
+        LOGGER::info,
+        
ProcedureMessages.PIPEHANDLEMETACHANGEPROCEDURE_ROLLBACKFROMCALCULATEINFOFORTASK);
 
     // Do nothing
   }
 
   @Override
   public void rollbackFromWriteConfigNodeConsensus(ConfigNodeProcedureEnv env) 
{
-    LOGGER.info(
+    PipeLogger.log(
+        LOGGER::info,
         
ProcedureMessages.PIPEHANDLEMETACHANGEPROCEDURE_ROLLBACKFROMWRITECONFIGNODECONSENSUS);
 
     // Do nothing
@@ -158,7 +171,9 @@ public class PipeHandleMetaChangeProcedure extends 
AbstractOperatePipeProcedureV
 
   @Override
   public void rollbackFromOperateOnDataNodes(ConfigNodeProcedureEnv env) {
-    
LOGGER.info(ProcedureMessages.PIPEHANDLEMETACHANGEPROCEDURE_ROLLBACKFROMOPERATEONDATANODES);
+    PipeLogger.log(
+        LOGGER::info,
+        
ProcedureMessages.PIPEHANDLEMETACHANGEPROCEDURE_ROLLBACKFROMOPERATEONDATANODES);
 
     // Do nothing
   }
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeMetaSyncProcedure.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeMetaSyncProcedure.java
index 33b909eae27..072555146c1 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeMetaSyncProcedure.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeMetaSyncProcedure.java
@@ -25,6 +25,7 @@ import org.apache.iotdb.commons.pipe.agent.task.meta.PipeMeta;
 import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta;
 import org.apache.iotdb.commons.pipe.config.PipeConfig;
 import org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant;
+import org.apache.iotdb.commons.pipe.resource.log.PipeLogger;
 import 
org.apache.iotdb.confignode.consensus.request.write.pipe.runtime.PipeHandleMetaChangePlan;
 import org.apache.iotdb.confignode.i18n.ConfigNodeMessages;
 import org.apache.iotdb.confignode.i18n.ProcedureMessages;
@@ -187,7 +188,10 @@ public class PipeMetaSyncProcedure extends 
AbstractOperatePipeProcedureV2 {
               .getConsensusManager()
               .write(new PipeHandleMetaChangePlan(pipeMetaList));
     } catch (ConsensusException e) {
-      
LOGGER.warn(ConfigNodeMessages.FAILED_IN_THE_WRITE_API_EXECUTING_THE_CONSENSUS_LAYER_DUE,
 e);
+      PipeLogger.log(
+          LOGGER::warn,
+          e,
+          
ConfigNodeMessages.FAILED_IN_THE_WRITE_API_EXECUTING_THE_CONSENSUS_LAYER_DUE);
       response = new 
TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode());
       response.setMessage(e.getMessage());
     }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index 8ae7e7eb610..068d44d0540 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -28,6 +28,7 @@ import org.apache.iotdb.commons.conf.TrimProperties;
 import org.apache.iotdb.commons.exception.BadNodeUrlException;
 import org.apache.iotdb.commons.memory.MemoryManager;
 import org.apache.iotdb.commons.pipe.config.PipeDescriptor;
+import org.apache.iotdb.commons.pipe.resource.log.PipePeriodicalLogReducer;
 import org.apache.iotdb.commons.schema.SchemaConstant;
 import org.apache.iotdb.commons.service.metric.MetricService;
 import org.apache.iotdb.commons.utils.NodeUrlUtils;
@@ -37,7 +38,6 @@ import org.apache.iotdb.confignode.rpc.thrift.TRatisConfig;
 import org.apache.iotdb.consensus.config.IoTConsensusV2Config;
 import org.apache.iotdb.db.consensus.DataRegionConsensusImpl;
 import org.apache.iotdb.db.i18n.DataNodeMiscMessages;
-import org.apache.iotdb.db.pipe.resource.log.PipePeriodicalLogReducer;
 import 
org.apache.iotdb.db.queryengine.plan.relational.metadata.fetcher.cache.LastCacheLoadStrategy;
 import org.apache.iotdb.db.service.metrics.IoTDBInternalLocalReporter;
 import org.apache.iotdb.db.storageengine.StorageEngine;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeDataNodeRuntimeAgent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeDataNodeRuntimeAgent.java
index 5e75199e388..3cdf92d5392 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeDataNodeRuntimeAgent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeDataNodeRuntimeAgent.java
@@ -35,6 +35,7 @@ import org.apache.iotdb.commons.pipe.config.PipeConfig;
 import org.apache.iotdb.commons.pipe.datastructure.pattern.IoTDBTreePattern;
 import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
 import org.apache.iotdb.commons.pipe.resource.log.PipeLogger;
+import org.apache.iotdb.commons.pipe.resource.log.PipePeriodicalLogReducer;
 import org.apache.iotdb.commons.service.IService;
 import org.apache.iotdb.commons.service.ServiceType;
 import org.apache.iotdb.commons.utils.TestOnly;
@@ -42,7 +43,6 @@ import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.i18n.DataNodePipeMessages;
 import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent;
 import 
org.apache.iotdb.db.pipe.resource.PipeDataNodeHardlinkOrCopiedFileDirStartupCleaner;
-import org.apache.iotdb.db.pipe.resource.log.PipePeriodicalLogReducer;
 import org.apache.iotdb.db.pipe.source.schemaregion.SchemaRegionListeningQueue;
 import 
org.apache.iotdb.db.queryengine.plan.analyze.cache.schema.DataNodeDevicePathCache;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
@@ -214,16 +214,20 @@ public class PipeDataNodeRuntimeAgent implements IService 
{
     if (event.getPipeTaskMeta() != null) {
       report(event.getPipeTaskMeta(), pipeRuntimeException);
     } else {
-      LOGGER.warn(DataNodePipeMessages.ATTEMPT_TO_REPORT_PIPE_EXCEPTION_TO_A, 
pipeRuntimeException);
+      PipeLogger.log(
+          LOGGER::warn,
+          pipeRuntimeException,
+          DataNodePipeMessages.ATTEMPT_TO_REPORT_PIPE_EXCEPTION_TO_A);
     }
   }
 
   public void report(PipeTaskMeta pipeTaskMeta, PipeRuntimeException 
pipeRuntimeException) {
-    LOGGER.warn(
+    PipeLogger.log(
+        LOGGER::warn,
+        pipeRuntimeException,
         
DataNodePipeMessages.REPORT_PIPERUNTIMEEXCEPTION_TO_LOCAL_PIPETASKMETA_EXCEPTION_MESSAGE,
         pipeTaskMeta,
-        pipeRuntimeException.getMessage(),
-        pipeRuntimeException);
+        pipeRuntimeException.getMessage());
 
     // Quick stop all pipes locally if critical exception occurs,
     // no need to wait for the next heartbeat cycle.
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
index e6c8ddefc99..f4cb3b281e9 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
@@ -36,6 +36,7 @@ import 
org.apache.iotdb.commons.pipe.datastructure.pattern.TreePattern;
 import org.apache.iotdb.commons.pipe.receiver.IoTDBFileReceiver;
 import org.apache.iotdb.commons.pipe.receiver.PipeReceiverStatusHandler;
 import org.apache.iotdb.commons.pipe.resource.log.PipeLogger;
+import org.apache.iotdb.commons.pipe.resource.log.PipePeriodicalLogReducer;
 import 
org.apache.iotdb.commons.pipe.sink.payload.airgap.AirGapPseudoTPipeTransferRequest;
 import 
org.apache.iotdb.commons.pipe.sink.payload.thrift.common.PipeTransferSliceReqHandler;
 import 
org.apache.iotdb.commons.pipe.sink.payload.thrift.request.PipeRequestType;
@@ -61,7 +62,6 @@ import 
org.apache.iotdb.db.pipe.receiver.visitor.PipeTableStatementDataTypeConve
 import 
org.apache.iotdb.db.pipe.receiver.visitor.PipeTreeStatementDataTypeConvertExecutionVisitor;
 import 
org.apache.iotdb.db.pipe.receiver.visitor.PipeTreeStatementToBatchVisitor;
 import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
-import org.apache.iotdb.db.pipe.resource.log.PipePeriodicalLogReducer;
 import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryBlock;
 import 
org.apache.iotdb.db.pipe.sink.payload.evolvable.request.PipeTransferDataNodeHandshakeV1Req;
 import 
org.apache.iotdb.db.pipe.sink.payload.evolvable.request.PipeTransferDataNodeHandshakeV2Req;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/log/PipePeriodicalLogReducer.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/log/PipePeriodicalLogReducer.java
deleted file mode 100644
index 946450192c5..00000000000
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/log/PipePeriodicalLogReducer.java
+++ /dev/null
@@ -1,93 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.db.pipe.resource.log;
-
-import org.apache.iotdb.commons.pipe.config.PipeConfig;
-import org.apache.iotdb.db.i18n.DataNodePipeMessages;
-import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
-import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryBlock;
-
-import com.github.benmanes.caffeine.cache.Cache;
-import com.github.benmanes.caffeine.cache.Caffeine;
-import org.apache.tsfile.utils.RamUsageEstimator;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.concurrent.TimeUnit;
-import java.util.function.Consumer;
-
-public class PipePeriodicalLogReducer {
-  private static final Logger LOGGER = 
LoggerFactory.getLogger(PipePeriodicalLogReducer.class);
-  private static final PipeMemoryBlock block;
-  protected static final Cache<String, String> loggerCache;
-
-  static {
-    // Never close because it's static
-    block =
-        PipeDataNodeResourceManager.memory()
-            
.tryAllocate(PipeConfig.getInstance().getPipeLoggerCacheMaxSizeInBytes());
-    loggerCache =
-        Caffeine.newBuilder()
-            .expireAfterWrite(
-                
PipeConfig.getInstance().getPipePeriodicalLogMinIntervalSeconds(), 
TimeUnit.SECONDS)
-            .weigher(
-                (k, v) ->
-                    Math.toIntExact(
-                        RamUsageEstimator.sizeOf((String) k)
-                            + RamUsageEstimator.HASHTABLE_RAM_BYTES_PER_ENTRY))
-            .maximumWeight(block.getMemoryUsageInBytes())
-            .build();
-  }
-
-  public static boolean log(
-      final Consumer<String> loggerFunction, final String rawMessage, final 
Object... formatter) {
-    final String loggerMessage = String.format(rawMessage, formatter);
-    if (!loggerCache.asMap().containsKey(loggerMessage)) {
-      loggerCache.put(loggerMessage, loggerMessage);
-      loggerFunction.accept(loggerMessage);
-      return true;
-    }
-    return false;
-  }
-
-  public static void update() {
-    loggerCache
-        .policy()
-        .expireAfterWrite()
-        .ifPresent(
-            time ->
-                time.setExpiresAfter(
-                    
PipeConfig.getInstance().getPipePeriodicalLogMinIntervalSeconds(),
-                    TimeUnit.SECONDS));
-    PipeDataNodeResourceManager.memory()
-        .resize(block, 
PipeConfig.getInstance().getPipeLoggerCacheMaxSizeInBytes(), false);
-    LOGGER.info(
-        DataNodePipeMessages.PIPEPERIODICALLOGREDUCER_IS_ALLOCATED_TO_BYTES,
-        block.getMemoryUsageInBytes());
-    loggerCache
-        .policy()
-        .eviction()
-        .ifPresent(eviction -> 
eviction.setMaximum(block.getMemoryUsageInBytes()));
-  }
-
-  private PipePeriodicalLogReducer() {
-    // static
-  }
-}
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/resource/log/PipeLogger.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/resource/log/PipeLogger.java
index 9d7be703c9c..97ff4afba63 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/resource/log/PipeLogger.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/resource/log/PipeLogger.java
@@ -19,6 +19,8 @@
 
 package org.apache.iotdb.commons.pipe.resource.log;
 
+import org.slf4j.helpers.MessageFormatter;
+
 import java.io.ByteArrayOutputStream;
 import java.io.PrintStream;
 import java.util.function.Consumer;
@@ -26,11 +28,11 @@ import java.util.function.Consumer;
 public class PipeLogger {
   private static PipePeriodicalLogger logger =
       (loggerFunction, rawMessage, formatter) ->
-          loggerFunction.accept(String.format(rawMessage, formatter));
+          loggerFunction.accept(formatMessage(rawMessage, formatter));
 
   public static void log(
       final Consumer<String> loggerFunction, final String rawMessage, final 
Object... formatter) {
-    logger.log(loggerFunction, "%s", format(rawMessage, formatter));
+    logger.log(loggerFunction, "%s", formatMessage(rawMessage, formatter));
   }
 
   public static void log(
@@ -40,7 +42,7 @@ public class PipeLogger {
       final Object... formatter) {
     final ByteArrayOutputStream out = new ByteArrayOutputStream();
     throwable.printStackTrace(new PrintStream(out));
-    logger.log(loggerFunction, "%s", format(rawMessage, formatter) + "\n" + 
out);
+    logger.log(loggerFunction, "%s", formatMessage(rawMessage, formatter) + 
"\n" + out);
   }
 
   public static void setLogger(final PipePeriodicalLogger logger) {
@@ -51,10 +53,14 @@ public class PipeLogger {
     // static
   }
 
-  private static String format(final String rawMessage, final Object... 
formatter) {
-    return formatter == null || formatter.length == 0
-        ? rawMessage
-        : String.format(rawMessage, formatter);
+  static String formatMessage(final String rawMessage, final Object... 
formatter) {
+    if (formatter == null || formatter.length == 0) {
+      return rawMessage;
+    }
+    if (rawMessage.contains("{}")) {
+      return MessageFormatter.arrayFormat(rawMessage, formatter).getMessage();
+    }
+    return String.format(rawMessage, formatter);
   }
 
   @FunctionalInterface
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/resource/log/PipePeriodicalLogReducer.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/resource/log/PipePeriodicalLogReducer.java
new file mode 100644
index 00000000000..30074f90ebf
--- /dev/null
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/resource/log/PipePeriodicalLogReducer.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.pipe.resource.log;
+
+import org.apache.iotdb.commons.pipe.config.PipeConfig;
+
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import org.apache.tsfile.utils.RamUsageEstimator;
+
+import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
+
+public class PipePeriodicalLogReducer {
+
+  protected static final Cache<String, String> LOGGER_CACHE =
+      Caffeine.newBuilder()
+          .expireAfterWrite(
+              
PipeConfig.getInstance().getPipePeriodicalLogMinIntervalSeconds(), 
TimeUnit.SECONDS)
+          .weigher(PipePeriodicalLogReducer::estimateSize)
+          
.maximumWeight(PipeConfig.getInstance().getPipeLoggerCacheMaxSizeInBytes())
+          .build();
+
+  private static int estimateSize(final String key, final String value) {
+    return Math.toIntExact(
+        RamUsageEstimator.sizeOf(key) + 
RamUsageEstimator.HASHTABLE_RAM_BYTES_PER_ENTRY);
+  }
+
+  public static boolean log(
+      final Consumer<String> loggerFunction, final String rawMessage, final 
Object... formatter) {
+    final String loggerMessage = PipeLogger.formatMessage(rawMessage, 
formatter);
+    if (!LOGGER_CACHE.asMap().containsKey(loggerMessage)) {
+      LOGGER_CACHE.put(loggerMessage, loggerMessage);
+      loggerFunction.accept(loggerMessage);
+      return true;
+    }
+    return false;
+  }
+
+  public static void update() {
+    update(PipeConfig.getInstance().getPipeLoggerCacheMaxSizeInBytes());
+  }
+
+  public static void update(final long maxWeight) {
+    LOGGER_CACHE
+        .policy()
+        .expireAfterWrite()
+        .ifPresent(
+            time ->
+                time.setExpiresAfter(
+                    
PipeConfig.getInstance().getPipePeriodicalLogMinIntervalSeconds(),
+                    TimeUnit.SECONDS));
+    LOGGER_CACHE.policy().eviction().ifPresent(eviction -> 
eviction.setMaximum(maxWeight));
+  }
+
+  private PipePeriodicalLogReducer() {
+    // static
+  }
+}
diff --git 
a/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/resource/PipeLoggerTest.java
 
b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/resource/PipeLoggerTest.java
index 122dfea7ca1..d37c109c9f5 100644
--- 
a/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/resource/PipeLoggerTest.java
+++ 
b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/resource/PipeLoggerTest.java
@@ -42,6 +42,20 @@ public class PipeLoggerTest {
     Assert.assertEquals("data_{sink.password=%/broken}", message.get());
   }
 
+  @Test
+  public void testLogMessageWithSlf4jPlaceholder() {
+    final AtomicReference<String> message = new AtomicReference<>();
+
+    setStringFormatLogger();
+    try {
+      PipeLogger.log(message::set, "PipeLoggerCacheMaxSizeInBytes: {}", 1024);
+    } finally {
+      setStringFormatLogger();
+    }
+
+    Assert.assertEquals("PipeLoggerCacheMaxSizeInBytes: 1024", message.get());
+  }
+
   @Test
   public void testLogThrowableWithPercentInStackTrace() {
     final AtomicReference<String> message = new AtomicReference<>();

Reply via email to