This is an automated email from the ASF dual-hosted git repository.

justinchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new ded266441d8 Pipe: Further reduce the pipe logs & Added configurations 
for tsFile segment lock (#16315)
ded266441d8 is described below

commit ded266441d865c294a963ae8c113364b30384e54
Author: Caideyipi <[email protected]>
AuthorDate: Tue Sep 2 09:39:46 2025 +0800

    Pipe: Further reduce the pipe logs & Added configurations for tsFile 
segment lock (#16315)
    
    * logger-fix
    
    * add-ons
    
    * fix
    
    * fix
    
    * fix
    
    * warn-fix
    
    * further-fix
    
    * fix
---
 .../confignode/persistence/pipe/PipeTaskInfo.java  |   2 +-
 .../impl/pipe/AbstractOperatePipeProcedureV2.java  |  10 +-
 .../impl/pipe/runtime/PipeMetaSyncProcedure.java   |  18 ++--
 .../protocol/thrift/IoTDBDataNodeReceiver.java     |  29 +++---
 .../tsfile/PipeTsFileResourceSegmentLock.java      |  63 +++++-------
 .../apache/iotdb/commons/conf/CommonConfig.java    |  23 ++++-
 .../iotdb/commons/pipe/config/PipeConfig.java      |   7 ++
 .../iotdb/commons/pipe/config/PipeDescriptor.java  |   5 +
 .../commons/pipe/receiver/IoTDBFileReceiver.java   | 106 ++++++++++++++-------
 9 files changed, 158 insertions(+), 105 deletions(-)

diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
index 6aadee63719..7b2fd6ad9b4 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
@@ -655,7 +655,7 @@ public class PipeTaskInfo implements SnapshotProcessor {
   }
 
   private TSStatus handleMetaChangesInternal(final PipeHandleMetaChangePlan 
plan) {
-    LOGGER.info("Handling pipe meta changes ...");
+    LOGGER.debug("Handling pipe meta changes ...");
 
     pipeMetaKeeper.clear();
 
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/AbstractOperatePipeProcedureV2.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/AbstractOperatePipeProcedureV2.java
index c48c450157c..25466d33983 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/AbstractOperatePipeProcedureV2.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/AbstractOperatePipeProcedureV2.java
@@ -109,12 +109,12 @@ public abstract class AbstractOperatePipeProcedureV2
 
   @Override
   protected ProcedureLockState acquireLock(ConfigNodeProcedureEnv 
configNodeProcedureEnv) {
-    LOGGER.info("ProcedureId {} try to acquire pipe lock.", getProcId());
+    LOGGER.debug("ProcedureId {} try to acquire pipe lock.", getProcId());
     pipeTaskInfo = acquireLockInternal(configNodeProcedureEnv);
     if (pipeTaskInfo == null) {
       LOGGER.warn("ProcedureId {} failed to acquire pipe lock.", getProcId());
     } else {
-      LOGGER.info("ProcedureId {} acquired pipe lock.", getProcId());
+      LOGGER.debug("ProcedureId {} acquired pipe lock.", getProcId());
     }
 
     final ProcedureLockState procedureLockState = 
super.acquireLock(configNodeProcedureEnv);
@@ -125,7 +125,7 @@ public abstract class AbstractOperatePipeProcedureV2
               "ProcedureId {}: LOCK_ACQUIRED. The following procedure should 
not be executed without pipe lock.",
               getProcId());
         } else {
-          LOGGER.info(
+          LOGGER.debug(
               "ProcedureId {}: LOCK_ACQUIRED. The following procedure should 
be executed with pipe lock.",
               getProcId());
         }
@@ -134,7 +134,7 @@ public abstract class AbstractOperatePipeProcedureV2
         if (pipeTaskInfo == null) {
           LOGGER.warn("ProcedureId {}: LOCK_EVENT_WAIT. Without acquiring pipe 
lock.", getProcId());
         } else {
-          LOGGER.info("ProcedureId {}: LOCK_EVENT_WAIT. Pipe lock will be 
released.", getProcId());
+          LOGGER.debug("ProcedureId {}: LOCK_EVENT_WAIT. Pipe lock will be 
released.", getProcId());
           configNodeProcedureEnv
               .getConfigManager()
               .getPipeManager()
@@ -173,7 +173,7 @@ public abstract class AbstractOperatePipeProcedureV2
     if (pipeTaskInfo == null) {
       LOGGER.warn("ProcedureId {} release lock. No need to release pipe 
lock.", getProcId());
     } else {
-      LOGGER.info("ProcedureId {} release lock. Pipe lock will be released.", 
getProcId());
+      LOGGER.debug("ProcedureId {} release lock. Pipe lock will be released.", 
getProcId());
       if (this instanceof PipeMetaSyncProcedure) {
         configNodeProcedureEnv
             .getConfigManager()
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeMetaSyncProcedure.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeMetaSyncProcedure.java
index 20e23a64701..393a8bd5ab8 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeMetaSyncProcedure.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeMetaSyncProcedure.java
@@ -87,7 +87,7 @@ public class PipeMetaSyncProcedure extends 
AbstractOperatePipeProcedureV2 {
     if (System.currentTimeMillis() - LAST_EXECUTION_TIME.get() < 
MIN_EXECUTION_INTERVAL_MS) {
       // Skip by setting the pipeTaskInfo to null
       pipeTaskInfo = null;
-      LOGGER.info(
+      LOGGER.debug(
           "PipeMetaSyncProcedure: acquireLock, skip the procedure due to the 
last execution time {}",
           LAST_EXECUTION_TIME.get());
       return ProcedureLockState.LOCK_ACQUIRED;
@@ -103,7 +103,7 @@ public class PipeMetaSyncProcedure extends 
AbstractOperatePipeProcedureV2 {
 
   @Override
   public boolean executeFromValidateTask(ConfigNodeProcedureEnv env) {
-    LOGGER.info("PipeMetaSyncProcedure: executeFromValidateTask");
+    LOGGER.debug("PipeMetaSyncProcedure: executeFromValidateTask");
 
     LAST_EXECUTION_TIME.set(System.currentTimeMillis());
     return true;
@@ -111,7 +111,7 @@ public class PipeMetaSyncProcedure extends 
AbstractOperatePipeProcedureV2 {
 
   @Override
   public void executeFromCalculateInfoForTask(ConfigNodeProcedureEnv env) {
-    LOGGER.info("PipeMetaSyncProcedure: executeFromCalculateInfoForTask");
+    LOGGER.debug("PipeMetaSyncProcedure: executeFromCalculateInfoForTask");
 
     // Re-balance the external source tasks here in case of any changes in the 
dataRegion
     pipeTaskInfo
@@ -170,7 +170,7 @@ public class PipeMetaSyncProcedure extends 
AbstractOperatePipeProcedureV2 {
 
   @Override
   public void executeFromWriteConfigNodeConsensus(ConfigNodeProcedureEnv env) {
-    LOGGER.info("PipeMetaSyncProcedure: executeFromWriteConfigNodeConsensus");
+    LOGGER.debug("PipeMetaSyncProcedure: executeFromWriteConfigNodeConsensus");
 
     final List<PipeMeta> pipeMetaList = new ArrayList<>();
     for (final PipeMeta pipeMeta : pipeTaskInfo.get().getPipeMetaList()) {
@@ -196,7 +196,7 @@ public class PipeMetaSyncProcedure extends 
AbstractOperatePipeProcedureV2 {
   @Override
   public void executeFromOperateOnDataNodes(ConfigNodeProcedureEnv env)
       throws PipeException, IOException {
-    LOGGER.info("PipeMetaSyncProcedure: executeFromOperateOnDataNodes");
+    LOGGER.debug("PipeMetaSyncProcedure: executeFromOperateOnDataNodes");
 
     Map<Integer, TPushPipeMetaResp> respMap = pushPipeMetaToDataNodes(env);
     if (pipeTaskInfo.get().recordDataNodePushPipeMetaExceptions(respMap)) {
@@ -209,28 +209,28 @@ public class PipeMetaSyncProcedure extends 
AbstractOperatePipeProcedureV2 {
 
   @Override
   public void rollbackFromValidateTask(ConfigNodeProcedureEnv env) {
-    LOGGER.info("PipeMetaSyncProcedure: rollbackFromValidateTask");
+    LOGGER.debug("PipeMetaSyncProcedure: rollbackFromValidateTask");
 
     // Do nothing
   }
 
   @Override
   public void rollbackFromCalculateInfoForTask(ConfigNodeProcedureEnv env) {
-    LOGGER.info("PipeMetaSyncProcedure: rollbackFromCalculateInfoForTask");
+    LOGGER.debug("PipeMetaSyncProcedure: rollbackFromCalculateInfoForTask");
 
     // Do nothing
   }
 
   @Override
   public void rollbackFromWriteConfigNodeConsensus(ConfigNodeProcedureEnv env) 
{
-    LOGGER.info("PipeMetaSyncProcedure: rollbackFromWriteConfigNodeConsensus");
+    LOGGER.debug("PipeMetaSyncProcedure: 
rollbackFromWriteConfigNodeConsensus");
 
     // Do nothing
   }
 
   @Override
   public void rollbackFromOperateOnDataNodes(ConfigNodeProcedureEnv env) {
-    LOGGER.info("PipeMetaSyncProcedure: rollbackFromOperateOnDataNodes");
+    LOGGER.debug("PipeMetaSyncProcedure: rollbackFromOperateOnDataNodes");
 
     // Do nothing
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
index cb2b86efafc..0469bb7b152 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
@@ -29,6 +29,7 @@ import 
org.apache.iotdb.commons.pipe.datastructure.pattern.IoTDBTreePattern;
 import org.apache.iotdb.commons.pipe.datastructure.pattern.TablePattern;
 import org.apache.iotdb.commons.pipe.receiver.IoTDBFileReceiver;
 import org.apache.iotdb.commons.pipe.receiver.PipeReceiverStatusHandler;
+import org.apache.iotdb.commons.pipe.resource.log.PipeLogger;
 import 
org.apache.iotdb.commons.pipe.sink.payload.airgap.AirGapPseudoTPipeTransferRequest;
 import 
org.apache.iotdb.commons.pipe.sink.payload.thrift.common.PipeTransferSliceReqHandler;
 import 
org.apache.iotdb.commons.pipe.sink.payload.thrift.request.PipeRequestType;
@@ -449,7 +450,7 @@ public class IoTDBDataNodeReceiver extends 
IoTDBFileReceiver {
     } catch (final Exception e) {
       final String error =
           String.format("Exception %s encountered while handling request %s.", 
e.getMessage(), req);
-      LOGGER.warn("Receiver id = {}: {}", receiverId.get(), error, e);
+      PipeLogger.log(LOGGER::warn, e, "Receiver id = %s: %s", 
receiverId.get(), error);
       return new TPipeTransferResp(RpcUtils.getStatus(TSStatusCode.PIPE_ERROR, 
error));
     }
   }
@@ -661,7 +662,8 @@ public class IoTDBDataNodeReceiver extends 
IoTDBFileReceiver {
       final TSStatus status =
           ((AlterLogicalViewNode) 
req.getPlanNode()).checkPermissionBeforeProcess(username);
       if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
-        LOGGER.warn(
+        PipeLogger.log(
+            LOGGER::warn,
             "Receiver id = {}: Failed to check authority for statement {}, 
username = {}, response = {}.",
             receiverId.get(),
             StatementType.ALTER_LOGICAL_VIEW.name(),
@@ -812,7 +814,8 @@ public class IoTDBDataNodeReceiver extends 
IoTDBFileReceiver {
           || result.getCode() == 
TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) {
         return result;
       } else {
-        LOGGER.warn(
+        PipeLogger.log(
+            LOGGER::warn,
             "Receiver id = {}: Failure status encountered while executing 
statement {}: {}",
             receiverId.get(),
             statement,
@@ -820,7 +823,8 @@ public class IoTDBDataNodeReceiver extends 
IoTDBFileReceiver {
         return statement.accept(STATEMENT_STATUS_VISITOR, result);
       }
     } catch (final Exception e) {
-      LOGGER.warn(
+      PipeLogger.log(
+          LOGGER::warn,
           "Receiver id = {}: Exception encountered while executing statement 
{}: ",
           receiverId.get(),
           statement,
@@ -873,7 +877,8 @@ public class IoTDBDataNodeReceiver extends 
IoTDBFileReceiver {
       final TSStatus permissionCheckStatus =
           AuthorityChecker.checkAuthority(statement, clientSession);
       if (permissionCheckStatus.getCode() != 
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
-        LOGGER.warn(
+        PipeLogger.log(
+            LOGGER::warn,
             "Receiver id = {}: Failed to check authority for statement {}, 
username = {}, response = {}.",
             receiverId.get(),
             statement.getType().name(),
@@ -1054,19 +1059,21 @@ public class IoTDBDataNodeReceiver extends 
IoTDBFileReceiver {
       // No strong need to handle the failure result
       if (!(result.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
           || result.getCode() == 
TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode())) {
-        LOGGER.warn(
-            "Receiver id = {}: Failure status encountered while executing 
statement {}: {}",
+        PipeLogger.log(
+            LOGGER::warn,
+            "Receiver id = %s: Failure status encountered while executing 
statement %s: %s",
             receiverId.get(),
             statement,
             result);
       }
       return result;
     } catch (final Exception e) {
-      LOGGER.warn(
-          "Receiver id = {}: Exception encountered while executing statement 
{}: ",
+      PipeLogger.log(
+          LOGGER::warn,
+          e,
+          "Receiver id = %s: Exception encountered while executing statement 
%s: ",
           receiverId.get(),
-          statement,
-          e);
+          statement);
       return new 
TSStatus(TSStatusCode.PIPE_TRANSFER_EXECUTE_STATEMENT_ERROR.getStatusCode())
           .setMessage(e.getMessage());
     }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceSegmentLock.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceSegmentLock.java
index d1bf7fef2ad..cd1be83e55f 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceSegmentLock.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceSegmentLock.java
@@ -19,6 +19,7 @@
 
 package org.apache.iotdb.db.pipe.resource.tsfile;
 
+import org.apache.iotdb.commons.pipe.config.PipeConfig;
 import org.apache.iotdb.db.storageengine.StorageEngine;
 
 import org.slf4j.Logger;
@@ -31,25 +32,29 @@ import java.util.concurrent.locks.ReentrantLock;
 public class PipeTsFileResourceSegmentLock {
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(PipeTsFileResourceSegmentLock.class);
-
   private static final int SEGMENT_LOCK_MIN_SIZE = 32;
   private static final int SEGMENT_LOCK_MAX_SIZE = 128;
-
   private volatile ReentrantLock[] locks;
 
   private void initIfNecessary() {
     if (locks == null) {
       synchronized (this) {
+        int lockSegmentSize = 
PipeConfig.getInstance().getPipeTsFileResourceSegmentLockNum();
         if (locks == null) {
-          int lockSegmentSize = SEGMENT_LOCK_MIN_SIZE;
-          try {
-            lockSegmentSize = 
StorageEngine.getInstance().getAllDataRegionIds().size();
-          } catch (final Exception e) {
-            LOGGER.warn(
-                "Cannot get data region ids, use default lock segment size: 
{}", lockSegmentSize);
+          if (lockSegmentSize <= 0) {
+            try {
+              lockSegmentSize =
+                  Math.min(
+                      Math.max(
+                          
StorageEngine.getInstance().getAllDataRegionIds().size(),
+                          SEGMENT_LOCK_MIN_SIZE),
+                      SEGMENT_LOCK_MAX_SIZE);
+            } catch (final Exception e) {
+              LOGGER.warn(
+                  "Cannot get data region ids, use default lock segment size: 
{}", lockSegmentSize);
+              lockSegmentSize = SEGMENT_LOCK_MIN_SIZE;
+            }
           }
-          lockSegmentSize = Math.min(SEGMENT_LOCK_MAX_SIZE, lockSegmentSize);
-          lockSegmentSize = Math.max(SEGMENT_LOCK_MIN_SIZE, lockSegmentSize);
 
           final ReentrantLock[] tmpLocks = new ReentrantLock[lockSegmentSize];
           for (int i = 0; i < tmpLocks.length; i++) {
@@ -58,6 +63,13 @@ public class PipeTsFileResourceSegmentLock {
 
           // publish this variable
           locks = tmpLocks;
+        } else if (locks.length < lockSegmentSize) {
+          final ReentrantLock[] tmpLocks = new ReentrantLock[lockSegmentSize];
+          System.arraycopy(locks, 0, tmpLocks, 0, locks.length);
+          for (int i = locks.length; i < lockSegmentSize; ++i) {
+            tmpLocks[i] = new ReentrantLock();
+          }
+          locks = tmpLocks;
         }
       }
     }
@@ -74,39 +86,8 @@ public class PipeTsFileResourceSegmentLock {
     return locks[Math.abs(file.hashCode()) % locks.length].tryLock(timeout, 
timeUnit);
   }
 
-  public boolean tryLockAll(final long timeout, final TimeUnit timeUnit)
-      throws InterruptedException {
-    initIfNecessary();
-    int alreadyLocked = 0;
-    for (final ReentrantLock lock : locks) {
-      if (lock.tryLock(timeout, timeUnit)) {
-        alreadyLocked++;
-      } else {
-        break;
-      }
-    }
-
-    if (alreadyLocked == locks.length) {
-      return true;
-    } else {
-      unlockUntil(alreadyLocked);
-      return false;
-    }
-  }
-
-  private void unlockUntil(final int index) {
-    for (int i = 0; i < index; i++) {
-      locks[i].unlock();
-    }
-  }
-
   public void unlock(final File file) {
     initIfNecessary();
     locks[Math.abs(file.hashCode()) % locks.length].unlock();
   }
-
-  public void unlockAll() {
-    initIfNecessary();
-    unlockUntil(locks.length);
-  }
 }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
index c5d0ecc558f..817bb3a494e 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
@@ -329,6 +329,7 @@ public class CommonConfig {
   private volatile double 
pipeThresholdAllocationStrategyFixedMemoryHighUsageThreshold = 0.8d;
   private volatile boolean pipeTransferTsFileSync = false;
   private volatile long pipeCheckAllSyncClientLiveTimeIntervalMs = 5 * 60 * 
1000L; // 5 minutes
+  private int pipeTsFileResourceSegmentLockNum = -1;
 
   private long twoStageAggregateMaxCombinerLiveTimeInMs = 8 * 60 * 1000L; // 8 
minutes
   private long twoStageAggregateDataRegionInfoCacheTimeInMs = 3 * 60 * 1000L; 
// 3 minutes
@@ -1942,14 +1943,28 @@ public class CommonConfig {
   }
 
   public void setPipeCheckAllSyncClientLiveTimeIntervalMs(
-      long pipeCheckSyncAllClientLiveTimeIntervalMs) {
-    if (this.pipeCheckAllSyncClientLiveTimeIntervalMs == 
pipeCheckSyncAllClientLiveTimeIntervalMs) {
+      long pipeCheckAllSyncClientLiveTimeIntervalMs) {
+    if (this.pipeCheckAllSyncClientLiveTimeIntervalMs == 
pipeCheckAllSyncClientLiveTimeIntervalMs) {
       return;
     }
-    this.pipeCheckAllSyncClientLiveTimeIntervalMs = 
pipeCheckSyncAllClientLiveTimeIntervalMs;
+    this.pipeCheckAllSyncClientLiveTimeIntervalMs = 
pipeCheckAllSyncClientLiveTimeIntervalMs;
     logger.info(
         "pipeCheckSyncAllClientLiveTimeIntervalMs is set to {}",
-        pipeCheckSyncAllClientLiveTimeIntervalMs);
+        pipeCheckAllSyncClientLiveTimeIntervalMs);
+  }
+
+  public int getPipeTsFileResourceSegmentLockNum() {
+    return pipeTsFileResourceSegmentLockNum;
+  }
+
+  public void setPipeTsFileResourceSegmentLockNum(int 
pipeTsFileResourceSegmentLockNum) {
+    if (this.pipeTsFileResourceSegmentLockNum == 
pipeTsFileResourceSegmentLockNum) {
+      return;
+    }
+    this.pipeTsFileResourceSegmentLockNum = pipeTsFileResourceSegmentLockNum;
+    logger.info(
+        "pipeCheckSyncAllClientLiveTimeIntervalMs is set to {}",
+        pipeCheckAllSyncClientLiveTimeIntervalMs);
   }
 
   public double getPipeSendTsFileRateLimitBytesPerSecond() {
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
index 21605ee70e2..329d8815ac8 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
@@ -293,6 +293,10 @@ public class PipeConfig {
     return COMMON_CONFIG.getPipeCheckAllSyncClientLiveTimeIntervalMs();
   }
 
+  public int getPipeTsFileResourceSegmentLockNum() {
+    return COMMON_CONFIG.getPipeTsFileResourceSegmentLockNum();
+  }
+
   /////////////////////////////// Meta Consistency 
///////////////////////////////
 
   public boolean isSeperatedPipeHeartbeatEnabled() {
@@ -525,6 +529,9 @@ public class PipeConfig {
     LOGGER.info(
         "PipeCheckAllSyncClientLiveTimeIntervalMs: {}",
         getPipeCheckAllSyncClientLiveTimeIntervalMs());
+    LOGGER.info(
+        "PipeCheckAllSyncClientLiveTimeIntervalMs: {}",
+        getPipeCheckAllSyncClientLiveTimeIntervalMs());
 
     LOGGER.info("PipeDynamicMemoryHistoryWeight: {}", 
getPipeDynamicMemoryHistoryWeight());
     LOGGER.info(
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
index 8009fa27dea..1beb4c67e25 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
@@ -498,6 +498,11 @@ public class PipeDescriptor {
             properties.getProperty(
                 "pipe_check_all_sync_client_live_time_interval_ms",
                 
String.valueOf(config.getPipeCheckAllSyncClientLiveTimeIntervalMs()))));
+    config.setPipeTsFileResourceSegmentLockNum(
+        Integer.parseInt(
+            properties.getProperty(
+                "pipe_tsfile_resource_segment_lock_num",
+                
String.valueOf(config.getPipeTsFileResourceSegmentLockNum()))));
 
     config.setPipeRemainingTimeCommitRateAutoSwitchSeconds(
         Long.parseLong(
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java
index 62b1dc26b3d..b6316a42ddc 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java
@@ -133,7 +133,8 @@ public abstract class IoTDBFileReceiver implements 
IoTDBReceiver {
               receiverId.get(),
               receiverFileDirWithIdSuffix.get().getPath());
         } catch (Exception e) {
-          LOGGER.warn(
+          PipeLogger.log(
+              LOGGER::warn,
               "Receiver id = {}: Failed to delete original receiver file dir 
{}, because {}.",
               receiverId.get(),
               receiverFileDirWithIdSuffix.get().getPath(),
@@ -163,13 +164,15 @@ public abstract class IoTDBFileReceiver implements 
IoTDBReceiver {
       try {
         receiverFileBaseDir = getReceiverFileBaseDir();
         if (Objects.isNull(receiverFileBaseDir)) {
-          LOGGER.warn(
+          PipeLogger.log(
+              LOGGER::warn,
               "Receiver id = {}: Failed to init pipe receiver file folder 
manager because all disks of folders are full.",
               receiverId.get());
           return new 
TPipeTransferResp(StatusUtils.getStatus(TSStatusCode.DISK_SPACE_INSUFFICIENT));
         }
       } catch (Exception e) {
-        LOGGER.warn(
+        PipeLogger.log(
+            LOGGER::warn,
             "Receiver id = {}: Failed to create pipe receiver file folder 
because all disks of folders are full.",
             receiverId.get(),
             e);
@@ -191,10 +194,11 @@ public abstract class IoTDBFileReceiver implements 
IoTDBReceiver {
         }
       } catch (Exception ignored) {
       }
-      LOGGER.warn(
-          "Receiver id = {}: Failed to create receiver file dir {}.",
+      PipeLogger.log(
+          LOGGER::warn,
+          "Receiver id = %s: Failed to create receiver file dir %s.",
           receiverId.get(),
-          newReceiverDir.getPath());
+          Objects.nonNull(newReceiverDir) ? newReceiverDir.getPath() : null);
       markFileBaseDirStateAbnormal(receiverFileBaseDir);
     }
     return new TPipeTransferResp(
@@ -220,8 +224,11 @@ public abstract class IoTDBFileReceiver implements 
IoTDBReceiver {
           RpcUtils.getStatus(
               TSStatusCode.PIPE_HANDSHAKE_ERROR,
               "Receiver can not get clusterId from config node.");
-      LOGGER.warn(
-          "Receiver id = {}: Handshake failed, response status = {}.", 
receiverId.get(), status);
+      PipeLogger.log(
+          LOGGER::warn,
+          "Receiver id = {}: Handshake failed, response status = {}.",
+          receiverId.get(),
+          status);
       return new TPipeTransferResp(status);
     }
 
@@ -232,8 +239,11 @@ public abstract class IoTDBFileReceiver implements 
IoTDBReceiver {
       final TSStatus status =
           RpcUtils.getStatus(
               TSStatusCode.PIPE_HANDSHAKE_ERROR, "Handshake request does not 
contain clusterId.");
-      LOGGER.warn(
-          "Receiver id = {}: Handshake failed, response status = {}.", 
receiverId.get(), status);
+      PipeLogger.log(
+          LOGGER::warn,
+          "Receiver id = %s: Handshake failed, response status = %s.",
+          receiverId.get(),
+          status);
       return new TPipeTransferResp(status);
     }
 
@@ -245,8 +255,11 @@ public abstract class IoTDBFileReceiver implements 
IoTDBReceiver {
               String.format(
                   "Receiver and sender are from the same cluster %s.",
                   clusterIdFromHandshakeRequest));
-      LOGGER.warn(
-          "Receiver id = {}: Handshake failed, response status = {}.", 
receiverId.get(), status);
+      PipeLogger.log(
+          LOGGER::warn,
+          "Receiver id = %s: Handshake failed, response status = %s.",
+          receiverId.get(),
+          status);
       return new TPipeTransferResp(status);
     }
 
@@ -258,8 +271,11 @@ public abstract class IoTDBFileReceiver implements 
IoTDBReceiver {
           RpcUtils.getStatus(
               TSStatusCode.PIPE_HANDSHAKE_ERROR,
               "Handshake request does not contain timestampPrecision.");
-      LOGGER.warn(
-          "Receiver id = {}: Handshake failed, response status = {}.", 
receiverId.get(), status);
+      PipeLogger.log(
+          LOGGER::warn,
+          "Receiver id = %s: Handshake failed, response status = %s.",
+          receiverId.get(),
+          status);
       return new TPipeTransferResp(status);
     }
 
@@ -275,7 +291,8 @@ public abstract class IoTDBFileReceiver implements 
IoTDBReceiver {
     }
     final TSStatus status = loginIfNecessary();
     if (status.code != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
-      LOGGER.warn(
+      PipeLogger.log(
+          LOGGER::warn,
           "Receiver id = {}: Handshake failed because login failed, response 
status = {}.",
           receiverId.get(),
           status);
@@ -338,7 +355,8 @@ public abstract class IoTDBFileReceiver implements 
IoTDBReceiver {
     if (shouldLogin()) {
       final TSStatus permissionCheckStatus = login();
       if (permissionCheckStatus.getCode() != 
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
-        LOGGER.warn(
+        PipeLogger.log(
+            LOGGER::warn,
             "Receiver id = {}: Failed to login, username = {}, response = {}.",
             receiverId.get(),
             username,
@@ -381,8 +399,9 @@ public abstract class IoTDBFileReceiver implements 
IoTDBReceiver {
                 String.format(
                     "Request sender to reset file reader's offset from %s to 
%s.",
                     req.getStartWritingOffset(), writingFileWriter.length()));
-        LOGGER.warn(
-            "Receiver id = {}: File offset reset requested by receiver, 
response status = {}.",
+        PipeLogger.log(
+            LOGGER::warn,
+            "Receiver id = %s: File offset reset requested by receiver, 
response status = %s.",
             receiverId.get(),
             status);
         return PipeTransferFilePieceResp.toTPipeTransferResp(status, 
writingFileWriter.length());
@@ -392,8 +411,12 @@ public abstract class IoTDBFileReceiver implements 
IoTDBReceiver {
       return PipeTransferFilePieceResp.toTPipeTransferResp(
           RpcUtils.SUCCESS_STATUS, writingFileWriter.length());
     } catch (final Exception e) {
-      LOGGER.warn(
-          "Receiver id = {}: Failed to write file piece from req {}.", 
receiverId.get(), req, e);
+      PipeLogger.log(
+          LOGGER::warn,
+          e,
+          "Receiver id = %s: Failed to write file piece from req {}.",
+          receiverId.get(),
+          req);
       final TSStatus status =
           RpcUtils.getStatus(
               TSStatusCode.PIPE_TRANSFER_FILE_ERROR,
@@ -468,7 +491,8 @@ public abstract class IoTDBFileReceiver implements 
IoTDBReceiver {
             writingFile == null ? "null" : writingFile.getPath(),
             writingFile == null ? 0 : writingFile.length());
       } catch (final Exception e) {
-        LOGGER.warn(
+        PipeLogger.log(
+            LOGGER::warn,
             "Receiver id = {}: Failed to close current writing file writer {}, 
because {}.",
             receiverId.get(),
             writingFile == null ? "null" : writingFile.getPath(),
@@ -506,7 +530,8 @@ public abstract class IoTDBFileReceiver implements 
IoTDBReceiver {
             receiverId.get(),
             file.getPath());
       } catch (final Exception e) {
-        LOGGER.warn(
+        PipeLogger.log(
+            LOGGER::warn,
             "Receiver id = {}: Failed to delete original writing file {}, 
because {}.",
             receiverId.get(),
             file.getPath(),
@@ -526,7 +551,8 @@ public abstract class IoTDBFileReceiver implements 
IoTDBReceiver {
   private boolean isWritingFileOffsetCorrect(final long offset) throws 
IOException {
     final boolean offsetCorrect = writingFileWriter.length() == offset;
     if (!offsetCorrect) {
-      LOGGER.warn(
+      PipeLogger.log(
+          LOGGER::warn,
           "Receiver id = {}: Writing file {}'s offset is {}, but request 
sender's offset is {}.",
           receiverId.get(),
           writingFile.getPath(),
@@ -544,7 +570,7 @@ public abstract class IoTDBFileReceiver implements 
IoTDBReceiver {
                 TSStatusCode.PIPE_TRANSFER_FILE_ERROR,
                 String.format(
                     "Failed to seal file, because writing file %s is not 
available.", writingFile));
-        LOGGER.warn(status.getMessage());
+        PipeLogger.log(LOGGER::warn, status.getMessage());
         return new TPipeTransferResp(status);
       }
 
@@ -580,7 +606,8 @@ public abstract class IoTDBFileReceiver implements 
IoTDBReceiver {
         LOGGER.info(
             "Receiver id = {}: Seal file {} successfully.", receiverId.get(), 
fileAbsolutePath);
       } else {
-        LOGGER.warn(
+        PipeLogger.log(
+            LOGGER::warn,
             "Receiver id = {}: Failed to seal file {}, because {}.",
             receiverId.get(),
             fileAbsolutePath,
@@ -588,7 +615,8 @@ public abstract class IoTDBFileReceiver implements 
IoTDBReceiver {
       }
       return new TPipeTransferResp(status);
     } catch (final Exception e) {
-      LOGGER.warn(
+      PipeLogger.log(
+          LOGGER::warn,
           "Receiver id = {}: Failed to seal file {} from req {}.",
           receiverId.get(),
           writingFile,
@@ -626,7 +654,7 @@ public abstract class IoTDBFileReceiver implements 
IoTDBReceiver {
                 String.format(
                     "Failed to seal file %s, because writing file %s is not 
available.",
                     req.getFileNames(), writingFile));
-        LOGGER.warn(status.getMessage());
+        PipeLogger.log(LOGGER::warn, status.getMessage());
         return new TPipeTransferResp(status);
       }
 
@@ -675,7 +703,8 @@ public abstract class IoTDBFileReceiver implements 
IoTDBReceiver {
         LOGGER.info(
             "Receiver id = {}: Seal file {} successfully.", receiverId.get(), 
fileAbsolutePaths);
       } else {
-        LOGGER.warn(
+        PipeLogger.log(
+            LOGGER::warn,
             "Receiver id = {}: Failed to seal file {}, status is {}.",
             receiverId.get(),
             fileAbsolutePaths,
@@ -683,8 +712,13 @@ public abstract class IoTDBFileReceiver implements 
IoTDBReceiver {
       }
       return new TPipeTransferResp(status);
     } catch (final Exception e) {
-      LOGGER.warn(
-          "Receiver id = {}: Failed to seal file {} from req {}.", 
receiverId.get(), files, req, e);
+      PipeLogger.log(
+          LOGGER::warn,
+          "Receiver id = {}: Failed to seal file {} from req {}.",
+          receiverId.get(),
+          files,
+          req,
+          e);
       return new TPipeTransferResp(
           RpcUtils.getStatus(
               TSStatusCode.PIPE_TRANSFER_FILE_ERROR,
@@ -707,7 +741,8 @@ public abstract class IoTDBFileReceiver implements 
IoTDBReceiver {
           RpcUtils.getStatus(
               TSStatusCode.PIPE_TRANSFER_FILE_ERROR,
               String.format("Failed to seal file %s, the file does not 
exist.", fileName));
-      LOGGER.warn(
+      PipeLogger.log(
+          LOGGER::warn,
           "Receiver id = {}: Failed to seal file {}, because the file does not 
exist.",
           receiverId.get(),
           fileName);
@@ -722,7 +757,8 @@ public abstract class IoTDBFileReceiver implements 
IoTDBReceiver {
                   "Failed to seal file %s, because the length of file is not 
correct. "
                       + "The original file has length %s, but receiver file 
has length %s.",
                   fileName, fileLength, writingFileWriter.length()));
-      LOGGER.warn(
+      PipeLogger.log(
+          LOGGER::warn,
           "Receiver id = {}: Failed to seal file {}, because the length of 
file is not correct. "
               + "The original file has length {}, but receiver file has length 
{}.",
           receiverId.get(),
@@ -743,7 +779,8 @@ public abstract class IoTDBFileReceiver implements 
IoTDBReceiver {
               TSStatusCode.PIPE_TRANSFER_FILE_ERROR,
               String.format(
                   "Failed to seal file %s, because writing file is %s.", 
fileName, writingFile));
-      LOGGER.warn(
+      PipeLogger.log(
+          LOGGER::warn,
           "Receiver id = {}: Failed to seal file {}, because writing file is 
{}.",
           receiverId.get(),
           fileName,
@@ -759,7 +796,8 @@ public abstract class IoTDBFileReceiver implements 
IoTDBReceiver {
                   "Failed to seal file %s, because the length of file is not 
correct. "
                       + "The original file has length %s, but receiver file 
has length %s.",
                   fileName, fileLength, writingFileWriter.length()));
-      LOGGER.warn(
+      PipeLogger.log(
+          LOGGER::warn,
           "Receiver id = {}: Failed to seal file {}, because the length of 
file is not correct. "
               + "The original file has length {}, but receiver file has length 
{}.",
           receiverId.get(),


Reply via email to