This is an automated email from the ASF dual-hosted git repository.
justinchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new ded266441d8 Pipe: Further reduce the pipe logs & Added configurations
for tsFile segment lock (#16315)
ded266441d8 is described below
commit ded266441d865c294a963ae8c113364b30384e54
Author: Caideyipi <[email protected]>
AuthorDate: Tue Sep 2 09:39:46 2025 +0800
Pipe: Further reduce the pipe logs & Added configurations for tsFile
segment lock (#16315)
* logger-fix
* add-ons
* fix
* fix
* fix
* warn-fix
* further-fix
* fix
---
.../confignode/persistence/pipe/PipeTaskInfo.java | 2 +-
.../impl/pipe/AbstractOperatePipeProcedureV2.java | 10 +-
.../impl/pipe/runtime/PipeMetaSyncProcedure.java | 18 ++--
.../protocol/thrift/IoTDBDataNodeReceiver.java | 29 +++---
.../tsfile/PipeTsFileResourceSegmentLock.java | 63 +++++-------
.../apache/iotdb/commons/conf/CommonConfig.java | 23 ++++-
.../iotdb/commons/pipe/config/PipeConfig.java | 7 ++
.../iotdb/commons/pipe/config/PipeDescriptor.java | 5 +
.../commons/pipe/receiver/IoTDBFileReceiver.java | 106 ++++++++++++++-------
9 files changed, 158 insertions(+), 105 deletions(-)
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
index 6aadee63719..7b2fd6ad9b4 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
@@ -655,7 +655,7 @@ public class PipeTaskInfo implements SnapshotProcessor {
}
private TSStatus handleMetaChangesInternal(final PipeHandleMetaChangePlan
plan) {
- LOGGER.info("Handling pipe meta changes ...");
+ LOGGER.debug("Handling pipe meta changes ...");
pipeMetaKeeper.clear();
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/AbstractOperatePipeProcedureV2.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/AbstractOperatePipeProcedureV2.java
index c48c450157c..25466d33983 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/AbstractOperatePipeProcedureV2.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/AbstractOperatePipeProcedureV2.java
@@ -109,12 +109,12 @@ public abstract class AbstractOperatePipeProcedureV2
@Override
protected ProcedureLockState acquireLock(ConfigNodeProcedureEnv
configNodeProcedureEnv) {
- LOGGER.info("ProcedureId {} try to acquire pipe lock.", getProcId());
+ LOGGER.debug("ProcedureId {} try to acquire pipe lock.", getProcId());
pipeTaskInfo = acquireLockInternal(configNodeProcedureEnv);
if (pipeTaskInfo == null) {
LOGGER.warn("ProcedureId {} failed to acquire pipe lock.", getProcId());
} else {
- LOGGER.info("ProcedureId {} acquired pipe lock.", getProcId());
+ LOGGER.debug("ProcedureId {} acquired pipe lock.", getProcId());
}
final ProcedureLockState procedureLockState =
super.acquireLock(configNodeProcedureEnv);
@@ -125,7 +125,7 @@ public abstract class AbstractOperatePipeProcedureV2
"ProcedureId {}: LOCK_ACQUIRED. The following procedure should
not be executed without pipe lock.",
getProcId());
} else {
- LOGGER.info(
+ LOGGER.debug(
"ProcedureId {}: LOCK_ACQUIRED. The following procedure should
be executed with pipe lock.",
getProcId());
}
@@ -134,7 +134,7 @@ public abstract class AbstractOperatePipeProcedureV2
if (pipeTaskInfo == null) {
LOGGER.warn("ProcedureId {}: LOCK_EVENT_WAIT. Without acquiring pipe
lock.", getProcId());
} else {
- LOGGER.info("ProcedureId {}: LOCK_EVENT_WAIT. Pipe lock will be
released.", getProcId());
+ LOGGER.debug("ProcedureId {}: LOCK_EVENT_WAIT. Pipe lock will be
released.", getProcId());
configNodeProcedureEnv
.getConfigManager()
.getPipeManager()
@@ -173,7 +173,7 @@ public abstract class AbstractOperatePipeProcedureV2
if (pipeTaskInfo == null) {
LOGGER.warn("ProcedureId {} release lock. No need to release pipe
lock.", getProcId());
} else {
- LOGGER.info("ProcedureId {} release lock. Pipe lock will be released.",
getProcId());
+ LOGGER.debug("ProcedureId {} release lock. Pipe lock will be released.",
getProcId());
if (this instanceof PipeMetaSyncProcedure) {
configNodeProcedureEnv
.getConfigManager()
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeMetaSyncProcedure.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeMetaSyncProcedure.java
index 20e23a64701..393a8bd5ab8 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeMetaSyncProcedure.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeMetaSyncProcedure.java
@@ -87,7 +87,7 @@ public class PipeMetaSyncProcedure extends
AbstractOperatePipeProcedureV2 {
if (System.currentTimeMillis() - LAST_EXECUTION_TIME.get() <
MIN_EXECUTION_INTERVAL_MS) {
// Skip by setting the pipeTaskInfo to null
pipeTaskInfo = null;
- LOGGER.info(
+ LOGGER.debug(
"PipeMetaSyncProcedure: acquireLock, skip the procedure due to the
last execution time {}",
LAST_EXECUTION_TIME.get());
return ProcedureLockState.LOCK_ACQUIRED;
@@ -103,7 +103,7 @@ public class PipeMetaSyncProcedure extends
AbstractOperatePipeProcedureV2 {
@Override
public boolean executeFromValidateTask(ConfigNodeProcedureEnv env) {
- LOGGER.info("PipeMetaSyncProcedure: executeFromValidateTask");
+ LOGGER.debug("PipeMetaSyncProcedure: executeFromValidateTask");
LAST_EXECUTION_TIME.set(System.currentTimeMillis());
return true;
@@ -111,7 +111,7 @@ public class PipeMetaSyncProcedure extends
AbstractOperatePipeProcedureV2 {
@Override
public void executeFromCalculateInfoForTask(ConfigNodeProcedureEnv env) {
- LOGGER.info("PipeMetaSyncProcedure: executeFromCalculateInfoForTask");
+ LOGGER.debug("PipeMetaSyncProcedure: executeFromCalculateInfoForTask");
// Re-balance the external source tasks here in case of any changes in the
dataRegion
pipeTaskInfo
@@ -170,7 +170,7 @@ public class PipeMetaSyncProcedure extends
AbstractOperatePipeProcedureV2 {
@Override
public void executeFromWriteConfigNodeConsensus(ConfigNodeProcedureEnv env) {
- LOGGER.info("PipeMetaSyncProcedure: executeFromWriteConfigNodeConsensus");
+ LOGGER.debug("PipeMetaSyncProcedure: executeFromWriteConfigNodeConsensus");
final List<PipeMeta> pipeMetaList = new ArrayList<>();
for (final PipeMeta pipeMeta : pipeTaskInfo.get().getPipeMetaList()) {
@@ -196,7 +196,7 @@ public class PipeMetaSyncProcedure extends
AbstractOperatePipeProcedureV2 {
@Override
public void executeFromOperateOnDataNodes(ConfigNodeProcedureEnv env)
throws PipeException, IOException {
- LOGGER.info("PipeMetaSyncProcedure: executeFromOperateOnDataNodes");
+ LOGGER.debug("PipeMetaSyncProcedure: executeFromOperateOnDataNodes");
Map<Integer, TPushPipeMetaResp> respMap = pushPipeMetaToDataNodes(env);
if (pipeTaskInfo.get().recordDataNodePushPipeMetaExceptions(respMap)) {
@@ -209,28 +209,28 @@ public class PipeMetaSyncProcedure extends
AbstractOperatePipeProcedureV2 {
@Override
public void rollbackFromValidateTask(ConfigNodeProcedureEnv env) {
- LOGGER.info("PipeMetaSyncProcedure: rollbackFromValidateTask");
+ LOGGER.debug("PipeMetaSyncProcedure: rollbackFromValidateTask");
// Do nothing
}
@Override
public void rollbackFromCalculateInfoForTask(ConfigNodeProcedureEnv env) {
- LOGGER.info("PipeMetaSyncProcedure: rollbackFromCalculateInfoForTask");
+ LOGGER.debug("PipeMetaSyncProcedure: rollbackFromCalculateInfoForTask");
// Do nothing
}
@Override
public void rollbackFromWriteConfigNodeConsensus(ConfigNodeProcedureEnv env)
{
- LOGGER.info("PipeMetaSyncProcedure: rollbackFromWriteConfigNodeConsensus");
+ LOGGER.debug("PipeMetaSyncProcedure:
rollbackFromWriteConfigNodeConsensus");
// Do nothing
}
@Override
public void rollbackFromOperateOnDataNodes(ConfigNodeProcedureEnv env) {
- LOGGER.info("PipeMetaSyncProcedure: rollbackFromOperateOnDataNodes");
+ LOGGER.debug("PipeMetaSyncProcedure: rollbackFromOperateOnDataNodes");
// Do nothing
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
index cb2b86efafc..0469bb7b152 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
@@ -29,6 +29,7 @@ import
org.apache.iotdb.commons.pipe.datastructure.pattern.IoTDBTreePattern;
import org.apache.iotdb.commons.pipe.datastructure.pattern.TablePattern;
import org.apache.iotdb.commons.pipe.receiver.IoTDBFileReceiver;
import org.apache.iotdb.commons.pipe.receiver.PipeReceiverStatusHandler;
+import org.apache.iotdb.commons.pipe.resource.log.PipeLogger;
import
org.apache.iotdb.commons.pipe.sink.payload.airgap.AirGapPseudoTPipeTransferRequest;
import
org.apache.iotdb.commons.pipe.sink.payload.thrift.common.PipeTransferSliceReqHandler;
import
org.apache.iotdb.commons.pipe.sink.payload.thrift.request.PipeRequestType;
@@ -449,7 +450,7 @@ public class IoTDBDataNodeReceiver extends
IoTDBFileReceiver {
} catch (final Exception e) {
final String error =
String.format("Exception %s encountered while handling request %s.",
e.getMessage(), req);
- LOGGER.warn("Receiver id = {}: {}", receiverId.get(), error, e);
+ PipeLogger.log(LOGGER::warn, e, "Receiver id = %s: %s",
receiverId.get(), error);
return new TPipeTransferResp(RpcUtils.getStatus(TSStatusCode.PIPE_ERROR,
error));
}
}
@@ -661,7 +662,8 @@ public class IoTDBDataNodeReceiver extends
IoTDBFileReceiver {
final TSStatus status =
((AlterLogicalViewNode)
req.getPlanNode()).checkPermissionBeforeProcess(username);
if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
- LOGGER.warn(
+ PipeLogger.log(
+ LOGGER::warn,
"Receiver id = {}: Failed to check authority for statement {},
username = {}, response = {}.",
receiverId.get(),
StatementType.ALTER_LOGICAL_VIEW.name(),
@@ -812,7 +814,8 @@ public class IoTDBDataNodeReceiver extends
IoTDBFileReceiver {
|| result.getCode() ==
TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) {
return result;
} else {
- LOGGER.warn(
+ PipeLogger.log(
+ LOGGER::warn,
"Receiver id = {}: Failure status encountered while executing
statement {}: {}",
receiverId.get(),
statement,
@@ -820,7 +823,8 @@ public class IoTDBDataNodeReceiver extends
IoTDBFileReceiver {
return statement.accept(STATEMENT_STATUS_VISITOR, result);
}
} catch (final Exception e) {
- LOGGER.warn(
+ PipeLogger.log(
+ LOGGER::warn,
"Receiver id = {}: Exception encountered while executing statement
{}: ",
receiverId.get(),
statement,
@@ -873,7 +877,8 @@ public class IoTDBDataNodeReceiver extends
IoTDBFileReceiver {
final TSStatus permissionCheckStatus =
AuthorityChecker.checkAuthority(statement, clientSession);
if (permissionCheckStatus.getCode() !=
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
- LOGGER.warn(
+ PipeLogger.log(
+ LOGGER::warn,
"Receiver id = {}: Failed to check authority for statement {},
username = {}, response = {}.",
receiverId.get(),
statement.getType().name(),
@@ -1054,19 +1059,21 @@ public class IoTDBDataNodeReceiver extends
IoTDBFileReceiver {
// No strong need to handle the failure result
if (!(result.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
|| result.getCode() ==
TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode())) {
- LOGGER.warn(
- "Receiver id = {}: Failure status encountered while executing
statement {}: {}",
+ PipeLogger.log(
+ LOGGER::warn,
+ "Receiver id = %s: Failure status encountered while executing
statement %s: %s",
receiverId.get(),
statement,
result);
}
return result;
} catch (final Exception e) {
- LOGGER.warn(
- "Receiver id = {}: Exception encountered while executing statement
{}: ",
+ PipeLogger.log(
+ LOGGER::warn,
+ e,
+ "Receiver id = %s: Exception encountered while executing statement
%s: ",
receiverId.get(),
- statement,
- e);
+ statement);
return new
TSStatus(TSStatusCode.PIPE_TRANSFER_EXECUTE_STATEMENT_ERROR.getStatusCode())
.setMessage(e.getMessage());
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceSegmentLock.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceSegmentLock.java
index d1bf7fef2ad..cd1be83e55f 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceSegmentLock.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceSegmentLock.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.db.pipe.resource.tsfile;
+import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.db.storageengine.StorageEngine;
import org.slf4j.Logger;
@@ -31,25 +32,29 @@ import java.util.concurrent.locks.ReentrantLock;
public class PipeTsFileResourceSegmentLock {
private static final Logger LOGGER =
LoggerFactory.getLogger(PipeTsFileResourceSegmentLock.class);
-
private static final int SEGMENT_LOCK_MIN_SIZE = 32;
private static final int SEGMENT_LOCK_MAX_SIZE = 128;
-
private volatile ReentrantLock[] locks;
private void initIfNecessary() {
if (locks == null) {
synchronized (this) {
+ int lockSegmentSize =
PipeConfig.getInstance().getPipeTsFileResourceSegmentLockNum();
if (locks == null) {
- int lockSegmentSize = SEGMENT_LOCK_MIN_SIZE;
- try {
- lockSegmentSize =
StorageEngine.getInstance().getAllDataRegionIds().size();
- } catch (final Exception e) {
- LOGGER.warn(
- "Cannot get data region ids, use default lock segment size:
{}", lockSegmentSize);
+ if (lockSegmentSize <= 0) {
+ try {
+ lockSegmentSize =
+ Math.min(
+ Math.max(
+
StorageEngine.getInstance().getAllDataRegionIds().size(),
+ SEGMENT_LOCK_MIN_SIZE),
+ SEGMENT_LOCK_MAX_SIZE);
+ } catch (final Exception e) {
+ LOGGER.warn(
+ "Cannot get data region ids, use default lock segment size:
{}", lockSegmentSize);
+ lockSegmentSize = SEGMENT_LOCK_MIN_SIZE;
+ }
}
- lockSegmentSize = Math.min(SEGMENT_LOCK_MAX_SIZE, lockSegmentSize);
- lockSegmentSize = Math.max(SEGMENT_LOCK_MIN_SIZE, lockSegmentSize);
final ReentrantLock[] tmpLocks = new ReentrantLock[lockSegmentSize];
for (int i = 0; i < tmpLocks.length; i++) {
@@ -58,6 +63,13 @@ public class PipeTsFileResourceSegmentLock {
// publish this variable
locks = tmpLocks;
+ } else if (locks.length < lockSegmentSize) {
+ final ReentrantLock[] tmpLocks = new ReentrantLock[lockSegmentSize];
+ System.arraycopy(locks, 0, tmpLocks, 0, locks.length);
+ for (int i = locks.length; i < lockSegmentSize; ++i) {
+ tmpLocks[i] = new ReentrantLock();
+ }
+ locks = tmpLocks;
}
}
}
@@ -74,39 +86,8 @@ public class PipeTsFileResourceSegmentLock {
return locks[Math.abs(file.hashCode()) % locks.length].tryLock(timeout,
timeUnit);
}
- public boolean tryLockAll(final long timeout, final TimeUnit timeUnit)
- throws InterruptedException {
- initIfNecessary();
- int alreadyLocked = 0;
- for (final ReentrantLock lock : locks) {
- if (lock.tryLock(timeout, timeUnit)) {
- alreadyLocked++;
- } else {
- break;
- }
- }
-
- if (alreadyLocked == locks.length) {
- return true;
- } else {
- unlockUntil(alreadyLocked);
- return false;
- }
- }
-
- private void unlockUntil(final int index) {
- for (int i = 0; i < index; i++) {
- locks[i].unlock();
- }
- }
-
public void unlock(final File file) {
initIfNecessary();
locks[Math.abs(file.hashCode()) % locks.length].unlock();
}
-
- public void unlockAll() {
- initIfNecessary();
- unlockUntil(locks.length);
- }
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
index c5d0ecc558f..817bb3a494e 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
@@ -329,6 +329,7 @@ public class CommonConfig {
private volatile double
pipeThresholdAllocationStrategyFixedMemoryHighUsageThreshold = 0.8d;
private volatile boolean pipeTransferTsFileSync = false;
private volatile long pipeCheckAllSyncClientLiveTimeIntervalMs = 5 * 60 *
1000L; // 5 minutes
+ private int pipeTsFileResourceSegmentLockNum = -1;
private long twoStageAggregateMaxCombinerLiveTimeInMs = 8 * 60 * 1000L; // 8
minutes
private long twoStageAggregateDataRegionInfoCacheTimeInMs = 3 * 60 * 1000L;
// 3 minutes
@@ -1942,14 +1943,28 @@ public class CommonConfig {
}
public void setPipeCheckAllSyncClientLiveTimeIntervalMs(
- long pipeCheckSyncAllClientLiveTimeIntervalMs) {
- if (this.pipeCheckAllSyncClientLiveTimeIntervalMs ==
pipeCheckSyncAllClientLiveTimeIntervalMs) {
+ long pipeCheckAllSyncClientLiveTimeIntervalMs) {
+ if (this.pipeCheckAllSyncClientLiveTimeIntervalMs ==
pipeCheckAllSyncClientLiveTimeIntervalMs) {
return;
}
- this.pipeCheckAllSyncClientLiveTimeIntervalMs =
pipeCheckSyncAllClientLiveTimeIntervalMs;
+ this.pipeCheckAllSyncClientLiveTimeIntervalMs =
pipeCheckAllSyncClientLiveTimeIntervalMs;
logger.info(
"pipeCheckSyncAllClientLiveTimeIntervalMs is set to {}",
- pipeCheckSyncAllClientLiveTimeIntervalMs);
+ pipeCheckAllSyncClientLiveTimeIntervalMs);
+ }
+
+ public int getPipeTsFileResourceSegmentLockNum() {
+ return pipeTsFileResourceSegmentLockNum;
+ }
+
+ public void setPipeTsFileResourceSegmentLockNum(int
pipeTsFileResourceSegmentLockNum) {
+ if (this.pipeTsFileResourceSegmentLockNum ==
pipeTsFileResourceSegmentLockNum) {
+ return;
+ }
+ this.pipeTsFileResourceSegmentLockNum = pipeTsFileResourceSegmentLockNum;
+ logger.info(
+ "pipeCheckSyncAllClientLiveTimeIntervalMs is set to {}",
+ pipeCheckAllSyncClientLiveTimeIntervalMs);
}
public double getPipeSendTsFileRateLimitBytesPerSecond() {
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
index 21605ee70e2..329d8815ac8 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
@@ -293,6 +293,10 @@ public class PipeConfig {
return COMMON_CONFIG.getPipeCheckAllSyncClientLiveTimeIntervalMs();
}
+ public int getPipeTsFileResourceSegmentLockNum() {
+ return COMMON_CONFIG.getPipeTsFileResourceSegmentLockNum();
+ }
+
/////////////////////////////// Meta Consistency
///////////////////////////////
public boolean isSeperatedPipeHeartbeatEnabled() {
@@ -525,6 +529,9 @@ public class PipeConfig {
LOGGER.info(
"PipeCheckAllSyncClientLiveTimeIntervalMs: {}",
getPipeCheckAllSyncClientLiveTimeIntervalMs());
+ LOGGER.info(
+ "PipeCheckAllSyncClientLiveTimeIntervalMs: {}",
+ getPipeCheckAllSyncClientLiveTimeIntervalMs());
LOGGER.info("PipeDynamicMemoryHistoryWeight: {}",
getPipeDynamicMemoryHistoryWeight());
LOGGER.info(
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
index 8009fa27dea..1beb4c67e25 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
@@ -498,6 +498,11 @@ public class PipeDescriptor {
properties.getProperty(
"pipe_check_all_sync_client_live_time_interval_ms",
String.valueOf(config.getPipeCheckAllSyncClientLiveTimeIntervalMs()))));
+ config.setPipeTsFileResourceSegmentLockNum(
+ Integer.parseInt(
+ properties.getProperty(
+ "pipe_tsfile_resource_segment_lock_num",
+
String.valueOf(config.getPipeTsFileResourceSegmentLockNum()))));
config.setPipeRemainingTimeCommitRateAutoSwitchSeconds(
Long.parseLong(
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java
index 62b1dc26b3d..b6316a42ddc 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java
@@ -133,7 +133,8 @@ public abstract class IoTDBFileReceiver implements
IoTDBReceiver {
receiverId.get(),
receiverFileDirWithIdSuffix.get().getPath());
} catch (Exception e) {
- LOGGER.warn(
+ PipeLogger.log(
+ LOGGER::warn,
"Receiver id = {}: Failed to delete original receiver file dir
{}, because {}.",
receiverId.get(),
receiverFileDirWithIdSuffix.get().getPath(),
@@ -163,13 +164,15 @@ public abstract class IoTDBFileReceiver implements
IoTDBReceiver {
try {
receiverFileBaseDir = getReceiverFileBaseDir();
if (Objects.isNull(receiverFileBaseDir)) {
- LOGGER.warn(
+ PipeLogger.log(
+ LOGGER::warn,
"Receiver id = {}: Failed to init pipe receiver file folder
manager because all disks of folders are full.",
receiverId.get());
return new
TPipeTransferResp(StatusUtils.getStatus(TSStatusCode.DISK_SPACE_INSUFFICIENT));
}
} catch (Exception e) {
- LOGGER.warn(
+ PipeLogger.log(
+ LOGGER::warn,
"Receiver id = {}: Failed to create pipe receiver file folder
because all disks of folders are full.",
receiverId.get(),
e);
@@ -191,10 +194,11 @@ public abstract class IoTDBFileReceiver implements
IoTDBReceiver {
}
} catch (Exception ignored) {
}
- LOGGER.warn(
- "Receiver id = {}: Failed to create receiver file dir {}.",
+ PipeLogger.log(
+ LOGGER::warn,
+ "Receiver id = %s: Failed to create receiver file dir %s.",
receiverId.get(),
- newReceiverDir.getPath());
+ Objects.nonNull(newReceiverDir) ? newReceiverDir.getPath() : null);
markFileBaseDirStateAbnormal(receiverFileBaseDir);
}
return new TPipeTransferResp(
@@ -220,8 +224,11 @@ public abstract class IoTDBFileReceiver implements
IoTDBReceiver {
RpcUtils.getStatus(
TSStatusCode.PIPE_HANDSHAKE_ERROR,
"Receiver can not get clusterId from config node.");
- LOGGER.warn(
- "Receiver id = {}: Handshake failed, response status = {}.",
receiverId.get(), status);
+ PipeLogger.log(
+ LOGGER::warn,
+ "Receiver id = {}: Handshake failed, response status = {}.",
+ receiverId.get(),
+ status);
return new TPipeTransferResp(status);
}
@@ -232,8 +239,11 @@ public abstract class IoTDBFileReceiver implements
IoTDBReceiver {
final TSStatus status =
RpcUtils.getStatus(
TSStatusCode.PIPE_HANDSHAKE_ERROR, "Handshake request does not
contain clusterId.");
- LOGGER.warn(
- "Receiver id = {}: Handshake failed, response status = {}.",
receiverId.get(), status);
+ PipeLogger.log(
+ LOGGER::warn,
+ "Receiver id = %s: Handshake failed, response status = %s.",
+ receiverId.get(),
+ status);
return new TPipeTransferResp(status);
}
@@ -245,8 +255,11 @@ public abstract class IoTDBFileReceiver implements
IoTDBReceiver {
String.format(
"Receiver and sender are from the same cluster %s.",
clusterIdFromHandshakeRequest));
- LOGGER.warn(
- "Receiver id = {}: Handshake failed, response status = {}.",
receiverId.get(), status);
+ PipeLogger.log(
+ LOGGER::warn,
+ "Receiver id = %s: Handshake failed, response status = %s.",
+ receiverId.get(),
+ status);
return new TPipeTransferResp(status);
}
@@ -258,8 +271,11 @@ public abstract class IoTDBFileReceiver implements
IoTDBReceiver {
RpcUtils.getStatus(
TSStatusCode.PIPE_HANDSHAKE_ERROR,
"Handshake request does not contain timestampPrecision.");
- LOGGER.warn(
- "Receiver id = {}: Handshake failed, response status = {}.",
receiverId.get(), status);
+ PipeLogger.log(
+ LOGGER::warn,
+ "Receiver id = %s: Handshake failed, response status = %s.",
+ receiverId.get(),
+ status);
return new TPipeTransferResp(status);
}
@@ -275,7 +291,8 @@ public abstract class IoTDBFileReceiver implements
IoTDBReceiver {
}
final TSStatus status = loginIfNecessary();
if (status.code != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
- LOGGER.warn(
+ PipeLogger.log(
+ LOGGER::warn,
"Receiver id = {}: Handshake failed because login failed, response
status = {}.",
receiverId.get(),
status);
@@ -338,7 +355,8 @@ public abstract class IoTDBFileReceiver implements
IoTDBReceiver {
if (shouldLogin()) {
final TSStatus permissionCheckStatus = login();
if (permissionCheckStatus.getCode() !=
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
- LOGGER.warn(
+ PipeLogger.log(
+ LOGGER::warn,
"Receiver id = {}: Failed to login, username = {}, response = {}.",
receiverId.get(),
username,
@@ -381,8 +399,9 @@ public abstract class IoTDBFileReceiver implements
IoTDBReceiver {
String.format(
"Request sender to reset file reader's offset from %s to
%s.",
req.getStartWritingOffset(), writingFileWriter.length()));
- LOGGER.warn(
- "Receiver id = {}: File offset reset requested by receiver,
response status = {}.",
+ PipeLogger.log(
+ LOGGER::warn,
+ "Receiver id = %s: File offset reset requested by receiver,
response status = %s.",
receiverId.get(),
status);
return PipeTransferFilePieceResp.toTPipeTransferResp(status,
writingFileWriter.length());
@@ -392,8 +411,12 @@ public abstract class IoTDBFileReceiver implements
IoTDBReceiver {
return PipeTransferFilePieceResp.toTPipeTransferResp(
RpcUtils.SUCCESS_STATUS, writingFileWriter.length());
} catch (final Exception e) {
- LOGGER.warn(
- "Receiver id = {}: Failed to write file piece from req {}.",
receiverId.get(), req, e);
+ PipeLogger.log(
+ LOGGER::warn,
+ e,
+ "Receiver id = %s: Failed to write file piece from req {}.",
+ receiverId.get(),
+ req);
final TSStatus status =
RpcUtils.getStatus(
TSStatusCode.PIPE_TRANSFER_FILE_ERROR,
@@ -468,7 +491,8 @@ public abstract class IoTDBFileReceiver implements
IoTDBReceiver {
writingFile == null ? "null" : writingFile.getPath(),
writingFile == null ? 0 : writingFile.length());
} catch (final Exception e) {
- LOGGER.warn(
+ PipeLogger.log(
+ LOGGER::warn,
"Receiver id = {}: Failed to close current writing file writer {},
because {}.",
receiverId.get(),
writingFile == null ? "null" : writingFile.getPath(),
@@ -506,7 +530,8 @@ public abstract class IoTDBFileReceiver implements
IoTDBReceiver {
receiverId.get(),
file.getPath());
} catch (final Exception e) {
- LOGGER.warn(
+ PipeLogger.log(
+ LOGGER::warn,
"Receiver id = {}: Failed to delete original writing file {},
because {}.",
receiverId.get(),
file.getPath(),
@@ -526,7 +551,8 @@ public abstract class IoTDBFileReceiver implements
IoTDBReceiver {
private boolean isWritingFileOffsetCorrect(final long offset) throws
IOException {
final boolean offsetCorrect = writingFileWriter.length() == offset;
if (!offsetCorrect) {
- LOGGER.warn(
+ PipeLogger.log(
+ LOGGER::warn,
"Receiver id = {}: Writing file {}'s offset is {}, but request
sender's offset is {}.",
receiverId.get(),
writingFile.getPath(),
@@ -544,7 +570,7 @@ public abstract class IoTDBFileReceiver implements
IoTDBReceiver {
TSStatusCode.PIPE_TRANSFER_FILE_ERROR,
String.format(
"Failed to seal file, because writing file %s is not
available.", writingFile));
- LOGGER.warn(status.getMessage());
+ PipeLogger.log(LOGGER::warn, status.getMessage());
return new TPipeTransferResp(status);
}
@@ -580,7 +606,8 @@ public abstract class IoTDBFileReceiver implements
IoTDBReceiver {
LOGGER.info(
"Receiver id = {}: Seal file {} successfully.", receiverId.get(),
fileAbsolutePath);
} else {
- LOGGER.warn(
+ PipeLogger.log(
+ LOGGER::warn,
"Receiver id = {}: Failed to seal file {}, because {}.",
receiverId.get(),
fileAbsolutePath,
@@ -588,7 +615,8 @@ public abstract class IoTDBFileReceiver implements
IoTDBReceiver {
}
return new TPipeTransferResp(status);
} catch (final Exception e) {
- LOGGER.warn(
+ PipeLogger.log(
+ LOGGER::warn,
"Receiver id = {}: Failed to seal file {} from req {}.",
receiverId.get(),
writingFile,
@@ -626,7 +654,7 @@ public abstract class IoTDBFileReceiver implements
IoTDBReceiver {
String.format(
"Failed to seal file %s, because writing file %s is not
available.",
req.getFileNames(), writingFile));
- LOGGER.warn(status.getMessage());
+ PipeLogger.log(LOGGER::warn, status.getMessage());
return new TPipeTransferResp(status);
}
@@ -675,7 +703,8 @@ public abstract class IoTDBFileReceiver implements
IoTDBReceiver {
LOGGER.info(
"Receiver id = {}: Seal file {} successfully.", receiverId.get(),
fileAbsolutePaths);
} else {
- LOGGER.warn(
+ PipeLogger.log(
+ LOGGER::warn,
"Receiver id = {}: Failed to seal file {}, status is {}.",
receiverId.get(),
fileAbsolutePaths,
@@ -683,8 +712,13 @@ public abstract class IoTDBFileReceiver implements
IoTDBReceiver {
}
return new TPipeTransferResp(status);
} catch (final Exception e) {
- LOGGER.warn(
- "Receiver id = {}: Failed to seal file {} from req {}.",
receiverId.get(), files, req, e);
+ PipeLogger.log(
+ LOGGER::warn,
+ "Receiver id = {}: Failed to seal file {} from req {}.",
+ receiverId.get(),
+ files,
+ req,
+ e);
return new TPipeTransferResp(
RpcUtils.getStatus(
TSStatusCode.PIPE_TRANSFER_FILE_ERROR,
@@ -707,7 +741,8 @@ public abstract class IoTDBFileReceiver implements
IoTDBReceiver {
RpcUtils.getStatus(
TSStatusCode.PIPE_TRANSFER_FILE_ERROR,
String.format("Failed to seal file %s, the file does not
exist.", fileName));
- LOGGER.warn(
+ PipeLogger.log(
+ LOGGER::warn,
"Receiver id = {}: Failed to seal file {}, because the file does not
exist.",
receiverId.get(),
fileName);
@@ -722,7 +757,8 @@ public abstract class IoTDBFileReceiver implements
IoTDBReceiver {
"Failed to seal file %s, because the length of file is not
correct. "
+ "The original file has length %s, but receiver file
has length %s.",
fileName, fileLength, writingFileWriter.length()));
- LOGGER.warn(
+ PipeLogger.log(
+ LOGGER::warn,
"Receiver id = {}: Failed to seal file {}, because the length of
file is not correct. "
+ "The original file has length {}, but receiver file has length
{}.",
receiverId.get(),
@@ -743,7 +779,8 @@ public abstract class IoTDBFileReceiver implements
IoTDBReceiver {
TSStatusCode.PIPE_TRANSFER_FILE_ERROR,
String.format(
"Failed to seal file %s, because writing file is %s.",
fileName, writingFile));
- LOGGER.warn(
+ PipeLogger.log(
+ LOGGER::warn,
"Receiver id = {}: Failed to seal file {}, because writing file is
{}.",
receiverId.get(),
fileName,
@@ -759,7 +796,8 @@ public abstract class IoTDBFileReceiver implements
IoTDBReceiver {
"Failed to seal file %s, because the length of file is not
correct. "
+ "The original file has length %s, but receiver file
has length %s.",
fileName, fileLength, writingFileWriter.length()));
- LOGGER.warn(
+ PipeLogger.log(
+ LOGGER::warn,
"Receiver id = {}: Failed to seal file {}, because the length of
file is not correct. "
+ "The original file has length {}, but receiver file has length
{}.",
receiverId.get(),