This is an automated email from the ASF dual-hosted git repository.
jt2594838 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new aa7b275318c Pipe: Optimize pipe logging with shared
PipePeriodicalLogReducer (#17887)
aa7b275318c is described below
commit aa7b275318c71dc905c1367a40a3931ff0e59e81
Author: Zhenyu Luo <[email protected]>
AuthorDate: Thu Jun 11 17:16:07 2026 +0800
Pipe: Optimize pipe logging with shared PipePeriodicalLogReducer (#17887)
* Optimize pipe logging with shared PipePeriodicalLogReducer
Move PipePeriodicalLogReducer to node-commons and route ConfigNode and
DataNode pipe logs through PipeLogger to rate-limit repetitive messages.
* Hot-reload PipePeriodicalLogReducer settings on ConfigNode
Apply pipe log reducer cache updates during ConfigNode configuration reload
so
pipe_periodical_log_min_interval_seconds and
pipe_logger_cache_max_size_in_bytes
take effect without restart.
Co-authored-by: Cursor <[email protected]>
* Remove redundant log when pipeMetaFromAgent is null in heartbeat parser.
Co-authored-by: Cursor <[email protected]>
* Fix missing i18n for pipe log messages on ConfigNode.
Move hardcoded heartbeat and extractor-close logs into ManagerMessages with
zh translations.
Co-authored-by: Cursor <[email protected]>
---------
Co-authored-by: Cursor <[email protected]>
Co-authored-by: Caideyipi <[email protected]>
---
.../iotdb/confignode/i18n/ManagerMessages.java | 8 +-
.../iotdb/confignode/i18n/ManagerMessages.java | 14 +++-
.../confignode/conf/ConfigNodeDescriptor.java | 10 ++-
.../agent/runtime/PipeConfigNodeRuntimeAgent.java | 17 ++--
.../pipe/agent/task/PipeConfigNodeSubtask.java | 21 ++---
.../pipe/coordinator/runtime/PipeMetaSyncer.java | 26 ++++--
.../runtime/heartbeat/PipeHeartbeatParser.java | 43 +++++-----
.../runtime/heartbeat/PipeHeartbeatScheduler.java | 7 +-
.../pipe/coordinator/task/PipeTaskCoordinator.java | 8 +-
.../coordinator/task/PipeTaskCoordinatorLock.java | 10 ++-
.../confignode/persistence/pipe/PipeTaskInfo.java | 9 ++-
.../runtime/PipeHandleLeaderChangeProcedure.java | 36 ++++++---
.../runtime/PipeHandleMetaChangeProcedure.java | 33 +++++---
.../impl/pipe/runtime/PipeMetaSyncProcedure.java | 6 +-
.../org/apache/iotdb/db/conf/IoTDBDescriptor.java | 2 +-
.../agent/runtime/PipeDataNodeRuntimeAgent.java | 14 ++--
.../protocol/thrift/IoTDBDataNodeReceiver.java | 2 +-
.../resource/log/PipePeriodicalLogReducer.java | 93 ----------------------
.../commons/pipe/resource/log/PipeLogger.java | 20 +++--
.../resource/log/PipePeriodicalLogReducer.java | 76 ++++++++++++++++++
.../commons/pipe/resource/PipeLoggerTest.java | 14 ++++
21 files changed, 287 insertions(+), 182 deletions(-)
diff --git
a/iotdb-core/confignode/src/main/i18n/en/org/apache/iotdb/confignode/i18n/ManagerMessages.java
b/iotdb-core/confignode/src/main/i18n/en/org/apache/iotdb/confignode/i18n/ManagerMessages.java
index 5dbfe151576..d3df427120d 100644
---
a/iotdb-core/confignode/src/main/i18n/en/org/apache/iotdb/confignode/i18n/ManagerMessages.java
+++
b/iotdb-core/confignode/src/main/i18n/en/org/apache/iotdb/confignode/i18n/ManagerMessages.java
@@ -63,10 +63,16 @@ public final class ManagerMessages {
public static final String DECREASE_REFERENCE_COUNT_FOR_SNAPSHOT_ERROR =
"Decrease reference count for snapshot {} error.";
public static final String DELETING_REGIONS_COSTS_MS = "Deleting regions
costs {}ms";
+ public static final String
DETECTED_HISTORICAL_PIPE_COMPLETION_REPORT_FROM_DATANODE =
+ "Detected historical pipe completion report from DataNode {} for pipe
{}. remainingEventCount: {}, remainingTime: {}, completedDataNodes: {}";
public static final String DETECTED_COMPLETION_OF_PIPE_STATIC_META_REMOVE_IT
=
"Detected completion of pipe {}, static meta: {}, remove it.";
+ public static final String ALL_DATANODES_REPORTED_HISTORICAL_PIPE_COMPLETED =
+ "All DataNodes reported historical pipe {} completed.
globalRemainingEventCount: {}, globalRemainingTime: {}, staticMeta: {}";
public static final String
DETECT_PIPERUNTIMECRITICALEXCEPTION_FROM_AGENT_STOP_PIPE =
"Detect PipeRuntimeCriticalException {} from agent, stop pipe {}.";
+ public static final String
DETECT_PIPERUNTIMESINKCRITICALEXCEPTION_FROM_AGENT_STOP_PIPE =
+ "Detect PipeRuntimeSinkCriticalException {} from agent, stop pipe {}.";
public static final String ENABLE_SEPARATION_OF_POWERS_IS_NOT_SUPPORTED =
"Enable separation of powers is not supported";
public static final String ENDEXECUTECQ_TIME_RANGE_IS_CURRENT_TIME_IS =
@@ -113,7 +119,7 @@ public final class ManagerMessages {
public static final String
FAILED_TO_CLOSE_CONSUMER_IN_CONSUMER_GROUP_RESULT_STATUS =
"Failed to close consumer {} in consumer group {}. Result status: {}.";
public static final String
FAILED_TO_CLOSE_EXTRACTOR_AFTER_FAILED_TO_INITIALIZE_EXTRACTOR =
- "Failed to close extractor after failed to initialize extractor. ";
+ "Failed to close extractor after failed to initialize extractor. Ignore
this exception.";
public static final String
FAILED_TO_CLOSE_SINK_AFTER_FAILED_TO_INITIALIZE_IT_IGNORE =
"Failed to close sink after failed to initialize it. Ignore this
exception.";
public static final String FAILED_TO_COLLECT_COMMITCREATETABLEPLAN =
diff --git
a/iotdb-core/confignode/src/main/i18n/zh/org/apache/iotdb/confignode/i18n/ManagerMessages.java
b/iotdb-core/confignode/src/main/i18n/zh/org/apache/iotdb/confignode/i18n/ManagerMessages.java
index e7321e5376c..5164d8e1229 100644
---
a/iotdb-core/confignode/src/main/i18n/zh/org/apache/iotdb/confignode/i18n/ManagerMessages.java
+++
b/iotdb-core/confignode/src/main/i18n/zh/org/apache/iotdb/confignode/i18n/ManagerMessages.java
@@ -63,10 +63,16 @@ public final class ManagerMessages {
public static final String DECREASE_REFERENCE_COUNT_FOR_SNAPSHOT_ERROR =
"Decrease reference count for snapshot {} error.";
public static final String DELETING_REGIONS_COSTS_MS = "Deleting regions
costs {}ms";
+ public static final String
DETECTED_HISTORICAL_PIPE_COMPLETION_REPORT_FROM_DATANODE =
+ "检测到来自 DataNode {} 的历史 pipe 完成上报,pipe {}。remainingEventCount: {},
remainingTime: {}, completedDataNodes: {}";
public static final String DETECTED_COMPLETION_OF_PIPE_STATIC_META_REMOVE_IT
=
- "Detected completion of pipe {}, static meta: {}, remove it.";
+ "检测到 pipe {} 已完成,static meta: {},将其移除。";
+ public static final String ALL_DATANODES_REPORTED_HISTORICAL_PIPE_COMPLETED =
+ "所有 DataNode 均已上报历史 pipe {} 完成。globalRemainingEventCount: {},
globalRemainingTime: {}, staticMeta: {}";
public static final String
DETECT_PIPERUNTIMECRITICALEXCEPTION_FROM_AGENT_STOP_PIPE =
- "Detect PipeRuntimeCriticalException {} from agent, stop pipe {}.";
+ "检测到 agent 上报 PipeRuntimeCriticalException {},停止 pipe {}。";
+ public static final String
DETECT_PIPERUNTIMESINKCRITICALEXCEPTION_FROM_AGENT_STOP_PIPE =
+ "检测到 agent 上报 PipeRuntimeSinkCriticalException {},停止 pipe {}。";
public static final String ENABLE_SEPARATION_OF_POWERS_IS_NOT_SUPPORTED =
"不支持启用权力分离";
public static final String ENDEXECUTECQ_TIME_RANGE_IS_CURRENT_TIME_IS =
"[EndExecuteCQ] {}, time range is [{}, {}), current time is {}";
@@ -112,9 +118,9 @@ public final class ManagerMessages {
public static final String
FAILED_TO_CLOSE_CONSUMER_IN_CONSUMER_GROUP_RESULT_STATUS =
"Failed to close consumer {} in consumer group {}. Result status: {}.";
public static final String
FAILED_TO_CLOSE_EXTRACTOR_AFTER_FAILED_TO_INITIALIZE_EXTRACTOR =
- "Failed to close extractor after failed to initialize extractor. ";
+ "初始化 extractor 失败后关闭 extractor 失败,忽略此异常。";
public static final String
FAILED_TO_CLOSE_SINK_AFTER_FAILED_TO_INITIALIZE_IT_IGNORE =
- "Failed to close sink after failed to initialize it. Ignore this
exception.";
+ "初始化 sink 失败后关闭 sink 失败,忽略此异常。";
public static final String FAILED_TO_COLLECT_COMMITCREATETABLEPLAN =
"Failed to collect CommitCreateTablePlan";
public static final String
FAILED_TO_COLLECT_PIPE_META_LIST_FROM_CONFIG_NODE_TASK =
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
index 7321431a12f..c23620f9e75 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
@@ -25,6 +25,8 @@ import org.apache.iotdb.commons.conf.ConfigurationFileUtils;
import org.apache.iotdb.commons.conf.IoTDBConstant;
import org.apache.iotdb.commons.conf.TrimProperties;
import org.apache.iotdb.commons.exception.BadNodeUrlException;
+import org.apache.iotdb.commons.pipe.config.PipeDescriptor;
+import org.apache.iotdb.commons.pipe.resource.log.PipePeriodicalLogReducer;
import org.apache.iotdb.commons.schema.SchemaConstant;
import org.apache.iotdb.commons.utils.NodeUrlUtils;
import org.apache.iotdb.confignode.i18n.ConfigNodeMessages;
@@ -826,12 +828,18 @@ public class ConfigNodeDescriptor {
}
}
- public void loadHotModifiedProps(TrimProperties properties) {
+ public void loadHotModifiedProps(TrimProperties properties) throws
IOException {
ConfigurationFileUtils.updateAppliedProperties(properties, true);
Optional.ofNullable(properties.getProperty(IoTDBConstant.CLUSTER_NAME))
.ifPresent(conf::setClusterName);
Optional.ofNullable(properties.getProperty("enable_topology_probing"))
.ifPresent(v ->
conf.setEnableTopologyProbing(Boolean.parseBoolean(v)));
+ loadPipeHotModifiedProp(properties);
+ }
+
+ private void loadPipeHotModifiedProp(TrimProperties properties) throws
IOException {
+ PipeDescriptor.loadPipeProps(commonDescriptor.getConfig(), properties,
true);
+ PipePeriodicalLogReducer.update();
}
public static ConfigNodeDescriptor getInstance() {
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/runtime/PipeConfigNodeRuntimeAgent.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/runtime/PipeConfigNodeRuntimeAgent.java
index ca98f4ec9e8..4726b50d899 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/runtime/PipeConfigNodeRuntimeAgent.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/runtime/PipeConfigNodeRuntimeAgent.java
@@ -26,6 +26,8 @@ import
org.apache.iotdb.commons.pipe.agent.runtime.PipePeriodicalPhantomReferenc
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta;
import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
+import org.apache.iotdb.commons.pipe.resource.log.PipeLogger;
+import org.apache.iotdb.commons.pipe.resource.log.PipePeriodicalLogReducer;
import org.apache.iotdb.commons.service.IService;
import org.apache.iotdb.commons.service.ServiceType;
import org.apache.iotdb.confignode.i18n.ManagerMessages;
@@ -56,6 +58,7 @@ public class PipeConfigNodeRuntimeAgent implements IService {
@Override
public synchronized void start() {
PipeConfig.getInstance().printAllConfigs();
+ PipeLogger.setLogger(PipePeriodicalLogReducer::log);
// PipeTasks will not be started here and will be started by
"HandleLeaderChange"
// procedure when the consensus layer notify leader ready
@@ -142,19 +145,21 @@ public class PipeConfigNodeRuntimeAgent implements
IService {
if (event.getPipeTaskMeta() != null) {
report(event.getPipeTaskMeta(), pipeRuntimeException);
} else {
- LOGGER.warn(
-
ManagerMessages.ATTEMPT_TO_REPORT_PIPE_EXCEPTION_TO_A_NULL_PIPETASKMETA,
- pipeRuntimeException);
+ PipeLogger.log(
+ LOGGER::warn,
+ pipeRuntimeException,
+
ManagerMessages.ATTEMPT_TO_REPORT_PIPE_EXCEPTION_TO_A_NULL_PIPETASKMETA);
}
}
private void report(
final PipeTaskMeta pipeTaskMeta, final PipeRuntimeException
pipeRuntimeException) {
- LOGGER.warn(
+ PipeLogger.log(
+ LOGGER::warn,
+ pipeRuntimeException,
ManagerMessages.REPORT_PIPERUNTIMEEXCEPTION_TO_LOCAL_PIPETASKMETA_EXCEPTION_MESSAGE,
pipeTaskMeta,
- pipeRuntimeException.getMessage(),
- pipeRuntimeException);
+ pipeRuntimeException.getMessage());
pipeTaskMeta.trackExceptionMessage(pipeRuntimeException);
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/task/PipeConfigNodeSubtask.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/task/PipeConfigNodeSubtask.java
index fd1a8490630..854f39c22bd 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/task/PipeConfigNodeSubtask.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/task/PipeConfigNodeSubtask.java
@@ -32,6 +32,7 @@ import
org.apache.iotdb.commons.pipe.config.plugin.env.PipeTaskSinkRuntimeEnviro
import
org.apache.iotdb.commons.pipe.config.plugin.env.PipeTaskSourceRuntimeEnvironment;
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
import org.apache.iotdb.commons.pipe.event.ProgressReportEvent;
+import org.apache.iotdb.commons.pipe.resource.log.PipeLogger;
import org.apache.iotdb.confignode.i18n.ManagerMessages;
import org.apache.iotdb.confignode.manager.pipe.agent.PipeConfigNodeAgent;
import
org.apache.iotdb.confignode.manager.pipe.metric.sink.PipeConfigRegionSinkMetrics;
@@ -105,10 +106,10 @@ public class PipeConfigNodeSubtask extends
PipeAbstractSinkSubtask {
try {
source.close();
} catch (Exception closeException) {
- LOGGER.warn(
-
ManagerMessages.FAILED_TO_CLOSE_EXTRACTOR_AFTER_FAILED_TO_INITIALIZE_EXTRACTOR
- + "Ignore this exception.",
- closeException);
+ PipeLogger.log(
+ LOGGER::warn,
+ closeException,
+
ManagerMessages.FAILED_TO_CLOSE_EXTRACTOR_AFTER_FAILED_TO_INITIALIZE_EXTRACTOR);
}
throw e;
}
@@ -154,9 +155,11 @@ public class PipeConfigNodeSubtask extends
PipeAbstractSinkSubtask {
try {
outputPipeSink.close();
} catch (final Exception closeException) {
- LOGGER.warn(
+ PipeLogger.log(
+ LOGGER::warn,
+ closeException,
ManagerMessages.FAILED_TO_CLOSE_SINK_AFTER_FAILED_TO_INITIALIZE_IT_IGNORE,
- closeException);
+ closeException.getMessage());
}
throw e;
}
@@ -208,19 +211,19 @@ public class PipeConfigNodeSubtask extends
PipeAbstractSinkSubtask {
try {
source.close();
} catch (final Exception e) {
- LOGGER.info(ManagerMessages.ERROR_OCCURRED_DURING_CLOSING_PIPEEXTRACTOR,
e);
+ PipeLogger.log(LOGGER::info, e,
ManagerMessages.ERROR_OCCURRED_DURING_CLOSING_PIPEEXTRACTOR);
}
try {
processor.close();
} catch (final Exception e) {
- LOGGER.info(ManagerMessages.ERROR_OCCURRED_DURING_CLOSING_PIPEPROCESSOR,
e);
+ PipeLogger.log(LOGGER::info, e,
ManagerMessages.ERROR_OCCURRED_DURING_CLOSING_PIPEPROCESSOR);
}
try {
outputPipeSink.close();
} catch (final Exception e) {
- LOGGER.info(ManagerMessages.ERROR_OCCURRED_DURING_CLOSING_PIPECONNECTOR,
e);
+ PipeLogger.log(LOGGER::info, e,
ManagerMessages.ERROR_OCCURRED_DURING_CLOSING_PIPECONNECTOR);
} finally {
// Should be after connector.close()
super.close();
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/PipeMetaSyncer.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/PipeMetaSyncer.java
index 941d6d6d06c..0d348e16417 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/PipeMetaSyncer.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/PipeMetaSyncer.java
@@ -24,6 +24,7 @@ import
org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.commons.concurrent.ThreadName;
import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
import org.apache.iotdb.commons.pipe.config.PipeConfig;
+import org.apache.iotdb.commons.pipe.resource.log.PipeLogger;
import org.apache.iotdb.confignode.i18n.ManagerMessages;
import org.apache.iotdb.confignode.manager.ConfigManager;
import org.apache.iotdb.confignode.manager.ProcedureManager;
@@ -95,7 +96,8 @@ public class PipeMetaSyncer {
isLastPipeSyncSuccessful = false;
if (configManager.getPipeManager().getPipeTaskCoordinator().isLocked()) {
- LOGGER.warn(
+ PipeLogger.log(
+ LOGGER::warn,
ManagerMessages.PIPETASKCOORDINATORLOCK_IS_HELD_BY_ANOTHER_THREAD_SKIP_THIS_ROUND_OF_2);
return;
}
@@ -109,7 +111,9 @@ public class PipeMetaSyncer {
==
PipeConfig.getInstance().getPipeMetaSyncerAutoRestartPipeCheckIntervalRound()) {
somePipesNeedRestarting = autoRestartWithLock();
if (somePipesNeedRestarting) {
-
LOGGER.info(ManagerMessages.SOME_PIPES_NEED_RESTARTING_WILL_RESTART_THEM_AFTER_THIS_SYNC);
+ PipeLogger.log(
+ LOGGER::info,
+
ManagerMessages.SOME_PIPES_NEED_RESTARTING_WILL_RESTART_THEM_AFTER_THIS_SYNC);
}
pipeAutoRestartRoundCounter.set(0);
}
@@ -130,19 +134,22 @@ public class PipeMetaSyncer {
if (handleMetaChangeStatus.getCode() ==
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
successfulSync = true;
} else {
- LOGGER.warn(
+ PipeLogger.log(
+ LOGGER::warn,
ManagerMessages.FAILED_TO_HANDLE_PIPE_META_CHANGE_RESULT_STATUS,
handleMetaChangeStatus);
}
}
if (successfulSync) {
- LOGGER.info(
+ PipeLogger.log(
+ LOGGER::info,
ManagerMessages.AFTER_THIS_SUCCESSFUL_SYNC_IF_PIPETASKINFO_IS_EMPTY_DURING_THIS);
isLastPipeSyncSuccessful = true;
}
} else {
- LOGGER.warn(ManagerMessages.FAILED_TO_SYNC_PIPE_META_RESULT_STATUS,
metaSyncStatus);
+ PipeLogger.log(
+ LOGGER::warn,
ManagerMessages.FAILED_TO_SYNC_PIPE_META_RESULT_STATUS, metaSyncStatus);
}
}
@@ -158,7 +165,8 @@ public class PipeMetaSyncer {
final AtomicReference<PipeTaskInfo> pipeTaskInfo =
configManager.getPipeManager().getPipeTaskCoordinator().tryLock();
if (pipeTaskInfo == null) {
-
LOGGER.warn(ManagerMessages.FAILED_TO_ACQUIRE_PIPE_LOCK_FOR_AUTO_RESTART_PIPE_TASK);
+ PipeLogger.log(
+ LOGGER::warn,
ManagerMessages.FAILED_TO_ACQUIRE_PIPE_LOCK_FOR_AUTO_RESTART_PIPE_TASK);
return false;
}
try {
@@ -176,7 +184,7 @@ public class PipeMetaSyncer {
.getRegionMaintainHandler()
.checkAndRepairConsensusPipes();
} catch (Exception e) {
- LOGGER.warn(ManagerMessages.FAILED_TO_CHECK_AND_REPAIR_CONSENSUS_PIPES,
e);
+ PipeLogger.log(LOGGER::warn, e,
ManagerMessages.FAILED_TO_CHECK_AND_REPAIR_CONSENSUS_PIPES);
}
}
@@ -184,7 +192,9 @@ public class PipeMetaSyncer {
final AtomicReference<PipeTaskInfo> pipeTaskInfo =
configManager.getPipeManager().getPipeTaskCoordinator().tryLock();
if (pipeTaskInfo == null) {
-
LOGGER.warn(ManagerMessages.FAILED_TO_ACQUIRE_PIPE_LOCK_FOR_HANDLING_SUCCESSFUL_RESTART);
+ PipeLogger.log(
+ LOGGER::warn,
+
ManagerMessages.FAILED_TO_ACQUIRE_PIPE_LOCK_FOR_HANDLING_SUCCESSFUL_RESTART);
return false;
}
try {
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeatParser.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeatParser.java
index 597120ffe1c..97c6795daf4 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeatParser.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeatParser.java
@@ -30,6 +30,7 @@ import
org.apache.iotdb.commons.pipe.agent.task.meta.PipeStatus;
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta;
import
org.apache.iotdb.commons.pipe.agent.task.meta.PipeTemporaryMetaInCoordinator;
import org.apache.iotdb.commons.pipe.config.PipeConfig;
+import org.apache.iotdb.commons.pipe.resource.log.PipeLogger;
import org.apache.iotdb.confignode.consensus.response.pipe.task.PipeTableResp;
import org.apache.iotdb.confignode.i18n.ManagerMessages;
import org.apache.iotdb.confignode.manager.ConfigManager;
@@ -93,7 +94,8 @@ public class PipeHeartbeatParser {
final AtomicReference<PipeTaskInfo> pipeTaskInfo =
configManager.getPipeManager().getPipeTaskCoordinator().tryLock();
if (pipeTaskInfo == null) {
- LOGGER.warn(
+ PipeLogger.log(
+ LOGGER::warn,
ManagerMessages.FAILED_TO_ACQUIRE_LOCK_WHEN_PARSEHEARTBEAT_FROM_NODE_ID,
nodeId);
return;
@@ -127,8 +129,10 @@ public class PipeHeartbeatParser {
configManager.getNodeManager().getRegisteredDataNodeCount()
+ (PipeConfig.getInstance().isSeperatedPipeHeartbeatEnabled() ? 1
: 0);
if (expectedNodeCount <= 0) {
- LOGGER.warn(
- ManagerMessages.EXPECTED_PIPE_HEARTBEAT_NODE_COUNT_IS_FALLBACK_TO_1,
expectedNodeCount);
+ PipeLogger.log(
+ LOGGER::warn,
+ ManagerMessages.EXPECTED_PIPE_HEARTBEAT_NODE_COUNT_IS_FALLBACK_TO_1,
+ expectedNodeCount);
return 1;
}
return expectedNodeCount;
@@ -142,10 +146,6 @@ public class PipeHeartbeatParser {
final PipeStaticMeta staticMeta =
pipeMetaFromCoordinator.getStaticMeta();
final PipeMeta pipeMetaFromAgent = pipeHeartbeat.getPipeMeta(staticMeta);
if (pipeMetaFromAgent == null) {
- LOGGER.info(
-
ManagerMessages.PIPERUNTIMECOORDINATOR_MEETS_ERROR_IN_UPDATING_PIPEMETAKEEPER
- + "pipeMetaFromAgent is null, pipeMetaFromCoordinator: {}",
- pipeMetaFromCoordinator);
continue;
}
@@ -157,8 +157,9 @@ public class PipeHeartbeatParser {
if (Boolean.TRUE.equals(isPipeCompletedFromAgent)) {
temporaryMeta.markDataNodeCompleted(nodeId);
- LOGGER.info(
- "Detected historical pipe completion report from DataNode {} for
pipe {}. remainingEventCount: {}, remainingTime: {}, completedDataNodes: {}",
+ PipeLogger.log(
+ LOGGER::info,
+
ManagerMessages.DETECTED_HISTORICAL_PIPE_COMPLETION_REPORT_FROM_DATANODE,
nodeId,
staticMeta.getPipeName(),
pipeHeartbeat.getRemainingEventCount(staticMeta),
@@ -169,14 +170,16 @@ public class PipeHeartbeatParser {
configManager.getNodeManager().getRegisteredDataNodeLocations().keySet();
uncompletedDataNodeIds.removeAll(temporaryMeta.getCompletedDataNodeIds());
if (uncompletedDataNodeIds.isEmpty()) {
- LOGGER.info(
- "All DataNodes reported historical pipe {} completed.
globalRemainingEventCount: {}, globalRemainingTime: {}, staticMeta: {}",
+ PipeLogger.log(
+ LOGGER::info,
+ ManagerMessages.ALL_DATANODES_REPORTED_HISTORICAL_PIPE_COMPLETED,
staticMeta.getPipeName(),
temporaryMeta.getGlobalRemainingEvents(),
temporaryMeta.getGlobalRemainingTime(),
staticMeta);
pipeTaskInfo.get().removePipeMeta(staticMeta.getPipeName());
- LOGGER.info(
+ PipeLogger.log(
+ LOGGER::info,
ManagerMessages.DETECTED_COMPLETION_OF_PIPE_STATIC_META_REMOVE_IT,
staticMeta.getPipeName(),
staticMeta);
@@ -267,7 +270,9 @@ public class PipeHeartbeatParser {
needWriteConsensusOnConfigNodes.set(true);
needPushPipeMetaToDataNodes.set(false);
- LOGGER.warn(
+ PipeLogger.log(
+ LOGGER::warn,
+ exception,
ManagerMessages.DETECT_PIPERUNTIMECRITICALEXCEPTION_FROM_AGENT_STOP_PIPE,
exception,
pipeName);
@@ -296,11 +301,13 @@ public class PipeHeartbeatParser {
needWriteConsensusOnConfigNodes.set(true);
needPushPipeMetaToDataNodes.set(false);
- LOGGER.warn(
- String.format(
- "Detect
PipeRuntimeConnectorCriticalException %s "
- + "from agent, stop pipe %s.",
- exception, pipeName));
+ PipeLogger.log(
+ LOGGER::warn,
+ exception,
+ ManagerMessages
+
.DETECT_PIPERUNTIMESINKCRITICALEXCEPTION_FROM_AGENT_STOP_PIPE,
+ exception,
+ pipeName);
});
}
}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeatScheduler.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeatScheduler.java
index bbdaee8e12c..91d36f4e441 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeatScheduler.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeatScheduler.java
@@ -25,6 +25,7 @@ import
org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.commons.concurrent.ThreadName;
import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
import org.apache.iotdb.commons.pipe.config.PipeConfig;
+import org.apache.iotdb.commons.pipe.resource.log.PipeLogger;
import org.apache.iotdb.confignode.client.async.CnToDnAsyncRequestType;
import
org.apache.iotdb.confignode.client.async.CnToDnInternalServiceAsyncRequestManager;
import
org.apache.iotdb.confignode.client.async.handlers.DataNodeAsyncRequestContext;
@@ -85,7 +86,8 @@ public class PipeHeartbeatScheduler {
}
if (configManager.getPipeManager().getPipeTaskCoordinator().isLocked()) {
- LOGGER.warn(
+ PipeLogger.log(
+ LOGGER::warn,
ManagerMessages.PIPETASKCOORDINATORLOCK_IS_HELD_BY_ANOTHER_THREAD_SKIP_THIS_ROUND_OF);
return;
}
@@ -130,7 +132,8 @@ public class PipeHeartbeatScheduler {
configNodeResp.getPipeRemainingEventCountList(),
configNodeResp.getPipeRemainingTimeList()));
} catch (final Exception e) {
-
LOGGER.warn(ManagerMessages.FAILED_TO_COLLECT_PIPE_META_LIST_FROM_CONFIG_NODE_TASK,
e);
+ PipeLogger.log(
+ LOGGER::warn, e,
ManagerMessages.FAILED_TO_COLLECT_PIPE_META_LIST_FROM_CONFIG_NODE_TASK);
}
}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinator.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinator.java
index 1d4d10006b6..490c3f03892 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinator.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinator.java
@@ -22,6 +22,7 @@ package
org.apache.iotdb.confignode.manager.pipe.coordinator.task;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeStaticMeta;
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeStatus;
+import org.apache.iotdb.commons.pipe.resource.log.PipeLogger;
import
org.apache.iotdb.confignode.consensus.request.read.pipe.task.ShowPipePlanV2;
import org.apache.iotdb.confignode.consensus.response.pipe.task.PipeTableResp;
import org.apache.iotdb.confignode.i18n.ConfigNodeMessages;
@@ -215,7 +216,10 @@ public class PipeTaskCoordinator {
.filter(req.whereClause, req.pipeName, req.isTableModel,
req.userName)
.convertToTShowPipeResp();
} catch (final ConsensusException e) {
-
LOGGER.warn(ConfigNodeMessages.FAILED_IN_THE_READ_API_EXECUTING_THE_CONSENSUS_LAYER_DUE,
e);
+ PipeLogger.log(
+ LOGGER::warn,
+ e,
+
ConfigNodeMessages.FAILED_IN_THE_READ_API_EXECUTING_THE_CONSENSUS_LAYER_DUE);
final TSStatus res = new
TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode());
res.setMessage(e.getMessage());
return new PipeTableResp(res,
Collections.emptyList()).convertToTShowPipeResp();
@@ -227,7 +231,7 @@ public class PipeTaskCoordinator {
return ((PipeTableResp) configManager.getConsensusManager().read(new
ShowPipePlanV2()))
.convertToTGetAllPipeInfoResp();
} catch (IOException | ConsensusException e) {
- LOGGER.warn(ManagerMessages.FAILED_TO_GET_ALL_PIPE_INFO, e);
+ PipeLogger.log(LOGGER::warn, e,
ManagerMessages.FAILED_TO_GET_ALL_PIPE_INFO);
return new TGetAllPipeInfoResp(
new
TSStatus(TSStatusCode.PIPE_ERROR.getStatusCode()).setMessage(e.getMessage()),
Collections.emptyList());
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinatorLock.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinatorLock.java
index 0f788435394..15991cfc90e 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinatorLock.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinatorLock.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.confignode.manager.pipe.coordinator.task;
+import org.apache.iotdb.commons.pipe.resource.log.PipeLogger;
import org.apache.iotdb.confignode.i18n.ManagerMessages;
import org.slf4j.Logger;
@@ -53,7 +54,8 @@ public class PipeTaskCoordinatorLock {
Thread.currentThread().getName());
} catch (final InterruptedException e) {
Thread.currentThread().interrupt();
- LOGGER.error(
+ PipeLogger.log(
+ LOGGER::error,
ManagerMessages.INTERRUPTED_WHILE_WAITING_FOR_PIPETASKCOORDINATOR_LOCK_CURRENT_THREAD,
Thread.currentThread().getName());
}
@@ -70,14 +72,16 @@ public class PipeTaskCoordinatorLock {
Thread.currentThread().getName());
return true;
} else {
- LOGGER.info(
+ PipeLogger.log(
+ LOGGER::info,
ManagerMessages.PIPETASKCOORDINATOR_LOCK_FAILED_TO_ACQUIRE_BY_THREAD_BECAUSE_OF_TIMEOUT,
Thread.currentThread().getName());
return false;
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
- LOGGER.error(
+ PipeLogger.log(
+ LOGGER::error,
ManagerMessages.INTERRUPTED_WHILE_WAITING_FOR_PIPETASKCOORDINATOR_LOCK_CURRENT_THREAD,
Thread.currentThread().getName());
return false;
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
index d4117080353..3c2e331b0a1 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
@@ -39,6 +39,7 @@ import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.commons.pipe.config.constant.PipeProcessorConstant;
import org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant;
import org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant;
+import org.apache.iotdb.commons.pipe.resource.log.PipeLogger;
import org.apache.iotdb.commons.snapshot.SnapshotProcessor;
import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlan;
import
org.apache.iotdb.confignode.consensus.request.write.pipe.runtime.PipeHandleLeaderChangePlan;
@@ -636,7 +637,8 @@ public class PipeTaskInfo implements SnapshotProcessor {
// external source pipe tasks are not balanced
here since non-leaders
// don't know about RegionLeader Map and will be
balanced in the meta
// sync procedure
- LOGGER.info(
+ PipeLogger.log(
+ LOGGER::info,
ConfigNodeMessages.PIPE_IS_USING_EXTERNAL_SOURCE_SKIP_REGION,
pipeMeta.getStaticMeta().getPipeName(),
plan.getConsensusGroupId2NewLeaderIdMap());
@@ -905,7 +907,10 @@ public class PipeTaskInfo implements SnapshotProcessor {
});
if (needRestart.get()) {
-
LOGGER.info(ConfigNodeMessages.PIPEMETASYNCER_IS_TRYING_TO_RESTART_THE_PIPES,
pipeToRestart);
+ PipeLogger.log(
+ LOGGER::info,
+ ConfigNodeMessages.PIPEMETASYNCER_IS_TRYING_TO_RESTART_THE_PIPES,
+ pipeToRestart);
}
return needRestart.get();
}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleLeaderChangeProcedure.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleLeaderChangeProcedure.java
index 2c5b9566fce..93653923375 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleLeaderChangeProcedure.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleLeaderChangeProcedure.java
@@ -22,6 +22,7 @@ package
org.apache.iotdb.confignode.procedure.impl.pipe.runtime;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.pipe.resource.log.PipeLogger;
import
org.apache.iotdb.confignode.consensus.request.write.pipe.runtime.PipeHandleLeaderChangePlan;
import org.apache.iotdb.confignode.i18n.ConfigNodeMessages;
import org.apache.iotdb.confignode.i18n.ProcedureMessages;
@@ -70,7 +71,8 @@ public class PipeHandleLeaderChangeProcedure extends
AbstractOperatePipeProcedur
@Override
public boolean executeFromValidateTask(ConfigNodeProcedureEnv env) {
-
LOGGER.info(ProcedureMessages.PIPEHANDLELEADERCHANGEPROCEDURE_EXECUTEFROMVALIDATETASK);
+ PipeLogger.log(
+ LOGGER::info,
ProcedureMessages.PIPEHANDLELEADERCHANGEPROCEDURE_EXECUTEFROMVALIDATETASK);
// Nothing needs to be checked
return true;
@@ -78,14 +80,18 @@ public class PipeHandleLeaderChangeProcedure extends
AbstractOperatePipeProcedur
@Override
public void executeFromCalculateInfoForTask(ConfigNodeProcedureEnv env) {
-
LOGGER.info(ProcedureMessages.PIPEHANDLELEADERCHANGEPROCEDURE_EXECUTEFROMCALCULATEINFOFORTASK);
+ PipeLogger.log(
+ LOGGER::info,
+
ProcedureMessages.PIPEHANDLELEADERCHANGEPROCEDURE_EXECUTEFROMCALCULATEINFOFORTASK);
// Nothing needs to be calculated
}
@Override
public void executeFromWriteConfigNodeConsensus(ConfigNodeProcedureEnv env) {
-
LOGGER.info(ProcedureMessages.PIPEHANDLELEADERCHANGEPROCEDURE_EXECUTEFROMHANDLEONCONFIGNODES);
+ PipeLogger.log(
+ LOGGER::info,
+
ProcedureMessages.PIPEHANDLELEADERCHANGEPROCEDURE_EXECUTEFROMHANDLEONCONFIGNODES);
final Map<TConsensusGroupId, Integer>
newConsensusGroupIdToLeaderConsensusIdMap =
new HashMap<>();
@@ -100,7 +106,10 @@ public class PipeHandleLeaderChangeProcedure extends
AbstractOperatePipeProcedur
try {
response =
env.getConfigManager().getConsensusManager().write(pipeHandleLeaderChangePlan);
} catch (ConsensusException e) {
-
LOGGER.warn(ConfigNodeMessages.FAILED_IN_THE_WRITE_API_EXECUTING_THE_CONSENSUS_LAYER_DUE,
e);
+ PipeLogger.log(
+ LOGGER::warn,
+ e,
+
ConfigNodeMessages.FAILED_IN_THE_WRITE_API_EXECUTING_THE_CONSENSUS_LAYER_DUE);
response = new
TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode());
response.setMessage(e.getMessage());
}
@@ -111,35 +120,44 @@ public class PipeHandleLeaderChangeProcedure extends
AbstractOperatePipeProcedur
@Override
public void executeFromOperateOnDataNodes(ConfigNodeProcedureEnv env) {
-
LOGGER.info(ProcedureMessages.PIPEHANDLELEADERCHANGEPROCEDURE_EXECUTEFROMHANDLEONDATANODES);
+ PipeLogger.log(
+ LOGGER::info,
+
ProcedureMessages.PIPEHANDLELEADERCHANGEPROCEDURE_EXECUTEFROMHANDLEONDATANODES);
pushPipeMetaToDataNodesIgnoreException(env);
}
@Override
public void rollbackFromValidateTask(ConfigNodeProcedureEnv env) {
-
LOGGER.info(ProcedureMessages.PIPEHANDLELEADERCHANGEPROCEDURE_ROLLBACKFROMVALIDATETASK);
+ PipeLogger.log(
+ LOGGER::info,
ProcedureMessages.PIPEHANDLELEADERCHANGEPROCEDURE_ROLLBACKFROMVALIDATETASK);
// Nothing to do
}
@Override
public void rollbackFromCalculateInfoForTask(ConfigNodeProcedureEnv env) {
-
LOGGER.info(ProcedureMessages.PIPEHANDLELEADERCHANGEPROCEDURE_ROLLBACKFROMCALCULATEINFOFORTASK);
+ PipeLogger.log(
+ LOGGER::info,
+
ProcedureMessages.PIPEHANDLELEADERCHANGEPROCEDURE_ROLLBACKFROMCALCULATEINFOFORTASK);
// Nothing to do
}
@Override
public void rollbackFromWriteConfigNodeConsensus(ConfigNodeProcedureEnv env)
{
-
LOGGER.info(ProcedureMessages.PIPEHANDLELEADERCHANGEPROCEDURE_ROLLBACKFROMHANDLEONCONFIGNODES);
+ PipeLogger.log(
+ LOGGER::info,
+
ProcedureMessages.PIPEHANDLELEADERCHANGEPROCEDURE_ROLLBACKFROMHANDLEONCONFIGNODES);
// Nothing to do
}
@Override
public void rollbackFromOperateOnDataNodes(ConfigNodeProcedureEnv env) {
-
LOGGER.info(ProcedureMessages.PIPEHANDLELEADERCHANGEPROCEDURE_ROLLBACKFROMCREATEONDATANODES);
+ PipeLogger.log(
+ LOGGER::info,
+
ProcedureMessages.PIPEHANDLELEADERCHANGEPROCEDURE_ROLLBACKFROMCREATEONDATANODES);
// Nothing to do
}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleMetaChangeProcedure.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleMetaChangeProcedure.java
index a0dec367352..394f7e2fa9f 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleMetaChangeProcedure.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleMetaChangeProcedure.java
@@ -21,6 +21,7 @@ package
org.apache.iotdb.confignode.procedure.impl.pipe.runtime;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeMeta;
+import org.apache.iotdb.commons.pipe.resource.log.PipeLogger;
import
org.apache.iotdb.confignode.consensus.request.write.pipe.runtime.PipeHandleMetaChangePlan;
import org.apache.iotdb.confignode.i18n.ConfigNodeMessages;
import org.apache.iotdb.confignode.i18n.ProcedureMessages;
@@ -80,7 +81,8 @@ public class PipeHandleMetaChangeProcedure extends
AbstractOperatePipeProcedureV
@Override
public boolean executeFromValidateTask(ConfigNodeProcedureEnv env) {
-
LOGGER.info(ProcedureMessages.PIPEHANDLEMETACHANGEPROCEDURE_EXECUTEFROMVALIDATETASK);
+ PipeLogger.log(
+ LOGGER::info,
ProcedureMessages.PIPEHANDLEMETACHANGEPROCEDURE_EXECUTEFROMVALIDATETASK);
// Do nothing
return true;
@@ -88,14 +90,17 @@ public class PipeHandleMetaChangeProcedure extends
AbstractOperatePipeProcedureV
@Override
public void executeFromCalculateInfoForTask(ConfigNodeProcedureEnv env) {
-
LOGGER.info(ProcedureMessages.PIPEHANDLEMETACHANGEPROCEDURE_EXECUTEFROMCALCULATEINFOFORTASK);
+ PipeLogger.log(
+ LOGGER::info,
+
ProcedureMessages.PIPEHANDLEMETACHANGEPROCEDURE_EXECUTEFROMCALCULATEINFOFORTASK);
// Do nothing
}
@Override
public void executeFromWriteConfigNodeConsensus(ConfigNodeProcedureEnv env) {
- LOGGER.info(
+ PipeLogger.log(
+ LOGGER::info,
ProcedureMessages.PIPEHANDLEMETACHANGEPROCEDURE_EXECUTEFROMWRITECONFIGNODECONSENSUS);
if (!needWriteConsensusOnConfigNodes) {
@@ -114,7 +119,10 @@ public class PipeHandleMetaChangeProcedure extends
AbstractOperatePipeProcedureV
.getConsensusManager()
.write(new PipeHandleMetaChangePlan(pipeMetaList));
} catch (ConsensusException e) {
-
LOGGER.warn(ConfigNodeMessages.FAILED_IN_THE_WRITE_API_EXECUTING_THE_CONSENSUS_LAYER_DUE,
e);
+ PipeLogger.log(
+ LOGGER::warn,
+ e,
+
ConfigNodeMessages.FAILED_IN_THE_WRITE_API_EXECUTING_THE_CONSENSUS_LAYER_DUE);
response = new
TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode());
response.setMessage(e.getMessage());
}
@@ -125,7 +133,8 @@ public class PipeHandleMetaChangeProcedure extends
AbstractOperatePipeProcedureV
@Override
public void executeFromOperateOnDataNodes(ConfigNodeProcedureEnv env) {
-
LOGGER.info(ProcedureMessages.PIPEHANDLEMETACHANGEPROCEDURE_EXECUTEFROMHANDLEONDATANODES);
+ PipeLogger.log(
+ LOGGER::info,
ProcedureMessages.PIPEHANDLEMETACHANGEPROCEDURE_EXECUTEFROMHANDLEONDATANODES);
if (!needPushPipeMetaToDataNodes) {
return;
@@ -136,21 +145,25 @@ public class PipeHandleMetaChangeProcedure extends
AbstractOperatePipeProcedureV
@Override
public void rollbackFromValidateTask(ConfigNodeProcedureEnv env) {
-
LOGGER.info(ProcedureMessages.PIPEHANDLEMETACHANGEPROCEDURE_ROLLBACKFROMVALIDATETASK);
+ PipeLogger.log(
+ LOGGER::info,
ProcedureMessages.PIPEHANDLEMETACHANGEPROCEDURE_ROLLBACKFROMVALIDATETASK);
// Do nothing
}
@Override
public void rollbackFromCalculateInfoForTask(ConfigNodeProcedureEnv env) {
-
LOGGER.info(ProcedureMessages.PIPEHANDLEMETACHANGEPROCEDURE_ROLLBACKFROMCALCULATEINFOFORTASK);
+ PipeLogger.log(
+ LOGGER::info,
+
ProcedureMessages.PIPEHANDLEMETACHANGEPROCEDURE_ROLLBACKFROMCALCULATEINFOFORTASK);
// Do nothing
}
@Override
public void rollbackFromWriteConfigNodeConsensus(ConfigNodeProcedureEnv env)
{
- LOGGER.info(
+ PipeLogger.log(
+ LOGGER::info,
ProcedureMessages.PIPEHANDLEMETACHANGEPROCEDURE_ROLLBACKFROMWRITECONFIGNODECONSENSUS);
// Do nothing
@@ -158,7 +171,9 @@ public class PipeHandleMetaChangeProcedure extends
AbstractOperatePipeProcedureV
@Override
public void rollbackFromOperateOnDataNodes(ConfigNodeProcedureEnv env) {
-
LOGGER.info(ProcedureMessages.PIPEHANDLEMETACHANGEPROCEDURE_ROLLBACKFROMOPERATEONDATANODES);
+ PipeLogger.log(
+ LOGGER::info,
+
ProcedureMessages.PIPEHANDLEMETACHANGEPROCEDURE_ROLLBACKFROMOPERATEONDATANODES);
// Do nothing
}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeMetaSyncProcedure.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeMetaSyncProcedure.java
index 33b909eae27..072555146c1 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeMetaSyncProcedure.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeMetaSyncProcedure.java
@@ -25,6 +25,7 @@ import org.apache.iotdb.commons.pipe.agent.task.meta.PipeMeta;
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta;
import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant;
+import org.apache.iotdb.commons.pipe.resource.log.PipeLogger;
import
org.apache.iotdb.confignode.consensus.request.write.pipe.runtime.PipeHandleMetaChangePlan;
import org.apache.iotdb.confignode.i18n.ConfigNodeMessages;
import org.apache.iotdb.confignode.i18n.ProcedureMessages;
@@ -187,7 +188,10 @@ public class PipeMetaSyncProcedure extends
AbstractOperatePipeProcedureV2 {
.getConsensusManager()
.write(new PipeHandleMetaChangePlan(pipeMetaList));
} catch (ConsensusException e) {
-
LOGGER.warn(ConfigNodeMessages.FAILED_IN_THE_WRITE_API_EXECUTING_THE_CONSENSUS_LAYER_DUE,
e);
+ PipeLogger.log(
+ LOGGER::warn,
+ e,
+
ConfigNodeMessages.FAILED_IN_THE_WRITE_API_EXECUTING_THE_CONSENSUS_LAYER_DUE);
response = new
TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode());
response.setMessage(e.getMessage());
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index 8ae7e7eb610..068d44d0540 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -28,6 +28,7 @@ import org.apache.iotdb.commons.conf.TrimProperties;
import org.apache.iotdb.commons.exception.BadNodeUrlException;
import org.apache.iotdb.commons.memory.MemoryManager;
import org.apache.iotdb.commons.pipe.config.PipeDescriptor;
+import org.apache.iotdb.commons.pipe.resource.log.PipePeriodicalLogReducer;
import org.apache.iotdb.commons.schema.SchemaConstant;
import org.apache.iotdb.commons.service.metric.MetricService;
import org.apache.iotdb.commons.utils.NodeUrlUtils;
@@ -37,7 +38,6 @@ import org.apache.iotdb.confignode.rpc.thrift.TRatisConfig;
import org.apache.iotdb.consensus.config.IoTConsensusV2Config;
import org.apache.iotdb.db.consensus.DataRegionConsensusImpl;
import org.apache.iotdb.db.i18n.DataNodeMiscMessages;
-import org.apache.iotdb.db.pipe.resource.log.PipePeriodicalLogReducer;
import
org.apache.iotdb.db.queryengine.plan.relational.metadata.fetcher.cache.LastCacheLoadStrategy;
import org.apache.iotdb.db.service.metrics.IoTDBInternalLocalReporter;
import org.apache.iotdb.db.storageengine.StorageEngine;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeDataNodeRuntimeAgent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeDataNodeRuntimeAgent.java
index 5e75199e388..3cdf92d5392 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeDataNodeRuntimeAgent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeDataNodeRuntimeAgent.java
@@ -35,6 +35,7 @@ import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.commons.pipe.datastructure.pattern.IoTDBTreePattern;
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
import org.apache.iotdb.commons.pipe.resource.log.PipeLogger;
+import org.apache.iotdb.commons.pipe.resource.log.PipePeriodicalLogReducer;
import org.apache.iotdb.commons.service.IService;
import org.apache.iotdb.commons.service.ServiceType;
import org.apache.iotdb.commons.utils.TestOnly;
@@ -42,7 +43,6 @@ import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.i18n.DataNodePipeMessages;
import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent;
import
org.apache.iotdb.db.pipe.resource.PipeDataNodeHardlinkOrCopiedFileDirStartupCleaner;
-import org.apache.iotdb.db.pipe.resource.log.PipePeriodicalLogReducer;
import org.apache.iotdb.db.pipe.source.schemaregion.SchemaRegionListeningQueue;
import
org.apache.iotdb.db.queryengine.plan.analyze.cache.schema.DataNodeDevicePathCache;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
@@ -214,16 +214,20 @@ public class PipeDataNodeRuntimeAgent implements IService
{
if (event.getPipeTaskMeta() != null) {
report(event.getPipeTaskMeta(), pipeRuntimeException);
} else {
- LOGGER.warn(DataNodePipeMessages.ATTEMPT_TO_REPORT_PIPE_EXCEPTION_TO_A,
pipeRuntimeException);
+ PipeLogger.log(
+ LOGGER::warn,
+ pipeRuntimeException,
+ DataNodePipeMessages.ATTEMPT_TO_REPORT_PIPE_EXCEPTION_TO_A);
}
}
public void report(PipeTaskMeta pipeTaskMeta, PipeRuntimeException
pipeRuntimeException) {
- LOGGER.warn(
+ PipeLogger.log(
+ LOGGER::warn,
+ pipeRuntimeException,
DataNodePipeMessages.REPORT_PIPERUNTIMEEXCEPTION_TO_LOCAL_PIPETASKMETA_EXCEPTION_MESSAGE,
pipeTaskMeta,
- pipeRuntimeException.getMessage(),
- pipeRuntimeException);
+ pipeRuntimeException.getMessage());
// Quick stop all pipes locally if critical exception occurs,
// no need to wait for the next heartbeat cycle.
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
index e6c8ddefc99..f4cb3b281e9 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
@@ -36,6 +36,7 @@ import
org.apache.iotdb.commons.pipe.datastructure.pattern.TreePattern;
import org.apache.iotdb.commons.pipe.receiver.IoTDBFileReceiver;
import org.apache.iotdb.commons.pipe.receiver.PipeReceiverStatusHandler;
import org.apache.iotdb.commons.pipe.resource.log.PipeLogger;
+import org.apache.iotdb.commons.pipe.resource.log.PipePeriodicalLogReducer;
import
org.apache.iotdb.commons.pipe.sink.payload.airgap.AirGapPseudoTPipeTransferRequest;
import
org.apache.iotdb.commons.pipe.sink.payload.thrift.common.PipeTransferSliceReqHandler;
import
org.apache.iotdb.commons.pipe.sink.payload.thrift.request.PipeRequestType;
@@ -61,7 +62,6 @@ import
org.apache.iotdb.db.pipe.receiver.visitor.PipeTableStatementDataTypeConve
import
org.apache.iotdb.db.pipe.receiver.visitor.PipeTreeStatementDataTypeConvertExecutionVisitor;
import
org.apache.iotdb.db.pipe.receiver.visitor.PipeTreeStatementToBatchVisitor;
import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
-import org.apache.iotdb.db.pipe.resource.log.PipePeriodicalLogReducer;
import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryBlock;
import
org.apache.iotdb.db.pipe.sink.payload.evolvable.request.PipeTransferDataNodeHandshakeV1Req;
import
org.apache.iotdb.db.pipe.sink.payload.evolvable.request.PipeTransferDataNodeHandshakeV2Req;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/log/PipePeriodicalLogReducer.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/log/PipePeriodicalLogReducer.java
deleted file mode 100644
index 946450192c5..00000000000
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/log/PipePeriodicalLogReducer.java
+++ /dev/null
@@ -1,93 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.db.pipe.resource.log;
-
-import org.apache.iotdb.commons.pipe.config.PipeConfig;
-import org.apache.iotdb.db.i18n.DataNodePipeMessages;
-import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
-import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryBlock;
-
-import com.github.benmanes.caffeine.cache.Cache;
-import com.github.benmanes.caffeine.cache.Caffeine;
-import org.apache.tsfile.utils.RamUsageEstimator;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.concurrent.TimeUnit;
-import java.util.function.Consumer;
-
-public class PipePeriodicalLogReducer {
- private static final Logger LOGGER =
LoggerFactory.getLogger(PipePeriodicalLogReducer.class);
- private static final PipeMemoryBlock block;
- protected static final Cache<String, String> loggerCache;
-
- static {
- // Never close because it's static
- block =
- PipeDataNodeResourceManager.memory()
-
.tryAllocate(PipeConfig.getInstance().getPipeLoggerCacheMaxSizeInBytes());
- loggerCache =
- Caffeine.newBuilder()
- .expireAfterWrite(
-
PipeConfig.getInstance().getPipePeriodicalLogMinIntervalSeconds(),
TimeUnit.SECONDS)
- .weigher(
- (k, v) ->
- Math.toIntExact(
- RamUsageEstimator.sizeOf((String) k)
- + RamUsageEstimator.HASHTABLE_RAM_BYTES_PER_ENTRY))
- .maximumWeight(block.getMemoryUsageInBytes())
- .build();
- }
-
- public static boolean log(
- final Consumer<String> loggerFunction, final String rawMessage, final
Object... formatter) {
- final String loggerMessage = String.format(rawMessage, formatter);
- if (!loggerCache.asMap().containsKey(loggerMessage)) {
- loggerCache.put(loggerMessage, loggerMessage);
- loggerFunction.accept(loggerMessage);
- return true;
- }
- return false;
- }
-
- public static void update() {
- loggerCache
- .policy()
- .expireAfterWrite()
- .ifPresent(
- time ->
- time.setExpiresAfter(
-
PipeConfig.getInstance().getPipePeriodicalLogMinIntervalSeconds(),
- TimeUnit.SECONDS));
- PipeDataNodeResourceManager.memory()
- .resize(block,
PipeConfig.getInstance().getPipeLoggerCacheMaxSizeInBytes(), false);
- LOGGER.info(
- DataNodePipeMessages.PIPEPERIODICALLOGREDUCER_IS_ALLOCATED_TO_BYTES,
- block.getMemoryUsageInBytes());
- loggerCache
- .policy()
- .eviction()
- .ifPresent(eviction ->
eviction.setMaximum(block.getMemoryUsageInBytes()));
- }
-
- private PipePeriodicalLogReducer() {
- // static
- }
-}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/resource/log/PipeLogger.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/resource/log/PipeLogger.java
index 9d7be703c9c..97ff4afba63 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/resource/log/PipeLogger.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/resource/log/PipeLogger.java
@@ -19,6 +19,8 @@
package org.apache.iotdb.commons.pipe.resource.log;
+import org.slf4j.helpers.MessageFormatter;
+
import java.io.ByteArrayOutputStream;
import java.io.PrintStream;
import java.util.function.Consumer;
@@ -26,11 +28,11 @@ import java.util.function.Consumer;
public class PipeLogger {
private static PipePeriodicalLogger logger =
(loggerFunction, rawMessage, formatter) ->
- loggerFunction.accept(String.format(rawMessage, formatter));
+ loggerFunction.accept(formatMessage(rawMessage, formatter));
public static void log(
final Consumer<String> loggerFunction, final String rawMessage, final
Object... formatter) {
- logger.log(loggerFunction, "%s", format(rawMessage, formatter));
+ logger.log(loggerFunction, "%s", formatMessage(rawMessage, formatter));
}
public static void log(
@@ -40,7 +42,7 @@ public class PipeLogger {
final Object... formatter) {
final ByteArrayOutputStream out = new ByteArrayOutputStream();
throwable.printStackTrace(new PrintStream(out));
- logger.log(loggerFunction, "%s", format(rawMessage, formatter) + "\n" +
out);
+ logger.log(loggerFunction, "%s", formatMessage(rawMessage, formatter) +
"\n" + out);
}
public static void setLogger(final PipePeriodicalLogger logger) {
@@ -51,10 +53,14 @@ public class PipeLogger {
// static
}
- private static String format(final String rawMessage, final Object...
formatter) {
- return formatter == null || formatter.length == 0
- ? rawMessage
- : String.format(rawMessage, formatter);
+ static String formatMessage(final String rawMessage, final Object...
formatter) {
+ if (formatter == null || formatter.length == 0) {
+ return rawMessage;
+ }
+ if (rawMessage.contains("{}")) {
+ return MessageFormatter.arrayFormat(rawMessage, formatter).getMessage();
+ }
+ return String.format(rawMessage, formatter);
}
@FunctionalInterface
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/resource/log/PipePeriodicalLogReducer.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/resource/log/PipePeriodicalLogReducer.java
new file mode 100644
index 00000000000..30074f90ebf
--- /dev/null
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/resource/log/PipePeriodicalLogReducer.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.pipe.resource.log;
+
+import org.apache.iotdb.commons.pipe.config.PipeConfig;
+
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import org.apache.tsfile.utils.RamUsageEstimator;
+
+import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
+
+public class PipePeriodicalLogReducer {
+
+ protected static final Cache<String, String> LOGGER_CACHE =
+ Caffeine.newBuilder()
+ .expireAfterWrite(
+
PipeConfig.getInstance().getPipePeriodicalLogMinIntervalSeconds(),
TimeUnit.SECONDS)
+ .weigher(PipePeriodicalLogReducer::estimateSize)
+
.maximumWeight(PipeConfig.getInstance().getPipeLoggerCacheMaxSizeInBytes())
+ .build();
+
+ private static int estimateSize(final String key, final String value) {
+ return Math.toIntExact(
+ RamUsageEstimator.sizeOf(key) +
RamUsageEstimator.HASHTABLE_RAM_BYTES_PER_ENTRY);
+ }
+
+ public static boolean log(
+ final Consumer<String> loggerFunction, final String rawMessage, final
Object... formatter) {
+ final String loggerMessage = PipeLogger.formatMessage(rawMessage,
formatter);
+ if (!LOGGER_CACHE.asMap().containsKey(loggerMessage)) {
+ LOGGER_CACHE.put(loggerMessage, loggerMessage);
+ loggerFunction.accept(loggerMessage);
+ return true;
+ }
+ return false;
+ }
+
+ public static void update() {
+ update(PipeConfig.getInstance().getPipeLoggerCacheMaxSizeInBytes());
+ }
+
+ public static void update(final long maxWeight) {
+ LOGGER_CACHE
+ .policy()
+ .expireAfterWrite()
+ .ifPresent(
+ time ->
+ time.setExpiresAfter(
+
PipeConfig.getInstance().getPipePeriodicalLogMinIntervalSeconds(),
+ TimeUnit.SECONDS));
+ LOGGER_CACHE.policy().eviction().ifPresent(eviction ->
eviction.setMaximum(maxWeight));
+ }
+
+ private PipePeriodicalLogReducer() {
+ // static
+ }
+}
diff --git
a/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/resource/PipeLoggerTest.java
b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/resource/PipeLoggerTest.java
index 122dfea7ca1..d37c109c9f5 100644
---
a/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/resource/PipeLoggerTest.java
+++
b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/resource/PipeLoggerTest.java
@@ -42,6 +42,20 @@ public class PipeLoggerTest {
Assert.assertEquals("data_{sink.password=%/broken}", message.get());
}
+ @Test
+ public void testLogMessageWithSlf4jPlaceholder() {
+ final AtomicReference<String> message = new AtomicReference<>();
+
+ setStringFormatLogger();
+ try {
+ PipeLogger.log(message::set, "PipeLoggerCacheMaxSizeInBytes: {}", 1024);
+ } finally {
+ setStringFormatLogger();
+ }
+
+ Assert.assertEquals("PipeLoggerCacheMaxSizeInBytes: 1024", message.get());
+ }
+
@Test
public void testLogThrowableWithPercentInStackTrace() {
final AtomicReference<String> message = new AtomicReference<>();