This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 15f63300fb7 Pipe: support restarting pipes on datanodes when they are
stuck (#11955)
15f63300fb7 is described below
commit 15f63300fb78fd2296625e598e796191c43e1de6
Author: Steve Yurong Su <[email protected]>
AuthorDate: Tue Jan 23 17:27:29 2024 +0800
Pipe: support restarting pipes on datanodes when they are stuck (#11955)
Co-authored-by: Caideyipi <[email protected]>
---
.../db/pipe/agent/runtime/PipeRuntimeAgent.java | 5 ++
.../db/pipe/agent/task/PipeTaskDataNodeAgent.java | 77 ++++++++++++++++++++++
.../pipe/extractor/IoTDBDataRegionExtractor.java | 11 ++++
.../PipeRealtimeDataRegionHybridExtractor.java | 9 ++-
.../iotdb/db/pipe/metric/PipeExtractorMetrics.java | 7 +-
.../subtask/connector/PipeConnectorSubtask.java | 8 +--
.../apache/iotdb/commons/conf/CommonConfig.java | 28 ++++++--
.../iotdb/commons/conf/CommonDescriptor.java | 10 +++
.../commons/pipe/agent/task/PipeTaskAgent.java | 4 +-
.../iotdb/commons/pipe/config/PipeConfig.java | 15 ++++-
10 files changed, 157 insertions(+), 17 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeRuntimeAgent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeRuntimeAgent.java
index 2414613ad96..75997e483e2 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeRuntimeAgent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeRuntimeAgent.java
@@ -73,6 +73,11 @@ public class PipeRuntimeAgent implements IService {
public synchronized void start() throws StartupException {
PipeConfig.getInstance().printAllConfigs();
PipeAgentLauncher.launchPipeTaskAgent();
+
+ registerPeriodicalJob(
+ "PipeTaskAgent#restartAllStuckPipes",
+ PipeAgent.task()::restartAllStuckPipes,
+ PipeConfig.getInstance().getPipeStuckRestartIntervalSeconds());
pipePeriodicalJobExecutor.start();
isShutdown.set(false);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskDataNodeAgent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskDataNodeAgent.java
index 15741629389..a42adb7cdc8 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskDataNodeAgent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskDataNodeAgent.java
@@ -24,6 +24,7 @@ import
org.apache.iotdb.commons.exception.pipe.PipeRuntimeConnectorCriticalExcep
import org.apache.iotdb.commons.exception.pipe.PipeRuntimeCriticalException;
import org.apache.iotdb.commons.exception.pipe.PipeRuntimeException;
import org.apache.iotdb.commons.pipe.agent.task.PipeTaskAgent;
+import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.commons.pipe.task.PipeTask;
import org.apache.iotdb.commons.pipe.task.meta.PipeMeta;
import org.apache.iotdb.commons.pipe.task.meta.PipeRuntimeMeta;
@@ -33,11 +34,15 @@ import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.pipe.agent.PipeAgent;
+import org.apache.iotdb.db.pipe.extractor.IoTDBDataRegionExtractor;
import
org.apache.iotdb.db.pipe.extractor.realtime.listener.PipeInsertionDataNodeListener;
+import org.apache.iotdb.db.pipe.metric.PipeExtractorMetrics;
+import org.apache.iotdb.db.pipe.resource.PipeResourceManager;
import org.apache.iotdb.db.pipe.task.PipeDataNodeTask;
import org.apache.iotdb.db.pipe.task.builder.PipeDataNodeBuilder;
import org.apache.iotdb.db.pipe.task.builder.PipeDataNodeTaskDataRegionBuilder;
import
org.apache.iotdb.db.pipe.task.builder.PipeDataNodeTaskSchemaRegionBuilder;
+import org.apache.iotdb.db.storageengine.dataregion.wal.WALManager;
import org.apache.iotdb.db.utils.DateTimeUtils;
import org.apache.iotdb.mpp.rpc.thrift.TDataNodeHeartbeatResp;
import org.apache.iotdb.mpp.rpc.thrift.TPipeHeartbeatReq;
@@ -52,8 +57,11 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
import static
org.apache.iotdb.common.rpc.thrift.TConsensusGroupType.ConfigRegion;
@@ -309,4 +317,73 @@ public class PipeTaskDataNodeAgent extends PipeTaskAgent {
PipeInsertionDataNodeListener.getInstance().listenToHeartbeat(true);
}
+
+ ///////////////////////// Restart Logic /////////////////////////
+
+ public void restartAllStuckPipes() {
+ if (!tryWriteLockWithTimeOut(5)) {
+ return;
+ }
+ try {
+ restartAllStuckPipesInternal();
+ } finally {
+ releaseWriteLock();
+ }
+ }
+
+ private void restartAllStuckPipesInternal() {
+ final Map<String, IoTDBDataRegionExtractor> taskId2ExtractorMap =
+ PipeExtractorMetrics.getInstance().getExtractorMap();
+
+ final Set<PipeMeta> stuckPipes = new HashSet<>();
+ for (final PipeMeta pipeMeta : pipeMetaKeeper.getPipeMetaList()) {
+ final String pipeName = pipeMeta.getStaticMeta().getPipeName();
+ final List<IoTDBDataRegionExtractor> extractors =
+ taskId2ExtractorMap.values().stream()
+ .filter(e -> e.getPipeName().equals(pipeName))
+ .collect(Collectors.toList());
+ if (extractors.isEmpty()
+ || !extractors.get(0).isStreamMode()
+ || extractors.stream()
+
.noneMatch(IoTDBDataRegionExtractor::hasConsumedAllHistoricalTsFiles)) {
+ continue;
+ }
+
+ if (mayLinkedTsFileCountReachDangerousThreshold()
+ || mayMemTablePinnedCountReachDangerousThreshold()
+ || mayWalSizeReachThrottleThreshold()) {
+ LOGGER.warn("Pipe {} may be stuck.", pipeMeta.getStaticMeta());
+ stuckPipes.add(pipeMeta);
+ }
+ }
+
+ // Restart all stuck pipes
+ stuckPipes.parallelStream().forEach(this::restartStuckPipe);
+ }
+
+ private boolean mayLinkedTsFileCountReachDangerousThreshold() {
+ return PipeResourceManager.tsfile().getLinkedTsfileCount()
+ >= 2 * PipeConfig.getInstance().getPipeMaxAllowedLinkedTsFileCount();
+ }
+
+ private boolean mayMemTablePinnedCountReachDangerousThreshold() {
+ return PipeResourceManager.wal().getPinnedWalCount()
+ >= 10 *
PipeConfig.getInstance().getPipeMaxAllowedPinnedMemTableCount();
+ }
+
+ private boolean mayWalSizeReachThrottleThreshold() {
+ return 3 * WALManager.getInstance().getTotalDiskUsage()
+ > 2 * IoTDBDescriptor.getInstance().getConfig().getThrottleThreshold();
+ }
+
+ private void restartStuckPipe(PipeMeta pipeMeta) {
+ LOGGER.warn("Pipe {} will be restarted because of stuck.",
pipeMeta.getStaticMeta());
+ final long startTime = System.currentTimeMillis();
+ handleDropPipeInternal(pipeMeta.getStaticMeta().getPipeName());
+ handleSinglePipeMetaChangesInternal(pipeMeta);
+ LOGGER.warn(
+ "Pipe {} was restarted because of stuck, time cost: {} ms.",
+ pipeMeta.getStaticMeta(),
+ System.currentTimeMillis() - startTime);
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/IoTDBDataRegionExtractor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/IoTDBDataRegionExtractor.java
index 58d274b82b1..c13f4071a67 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/IoTDBDataRegionExtractor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/IoTDBDataRegionExtractor.java
@@ -372,6 +372,17 @@ public class IoTDBDataRegionExtractor implements
PipeExtractor {
}
}
+ //////////////////////////// APIs provided for detecting stuck
////////////////////////////
+
+ public boolean isStreamMode() {
+ return realtimeExtractor instanceof PipeRealtimeDataRegionHybridExtractor
+ || realtimeExtractor instanceof PipeRealtimeDataRegionLogExtractor;
+ }
+
+ public boolean hasConsumedAllHistoricalTsFiles() {
+ return historicalExtractor.hasConsumedAll();
+ }
+
//////////////////////////// APIs provided for metric framework
////////////////////////////
public String getTaskID() {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionHybridExtractor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionHybridExtractor.java
index 91b786bfffa..568c96b5263 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionHybridExtractor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionHybridExtractor.java
@@ -224,10 +224,12 @@ public class PipeRealtimeDataRegionHybridExtractor
extends PipeRealtimeDataRegio
// the write operation will be throttled, so we should not extract any
more tablet events.
// 3. The number of pinned memtables has reached the dangerous threshold.
// 4. The number of tsfile events in the pending queue has exceeded the
limit.
+ // 5. The number of linked tsfiles has reached the dangerous threshold.
return !isStartedToSupply
|| mayWalSizeReachThrottleThreshold()
|| mayMemTablePinnedCountReachDangerousThreshold()
- || isTsFileEventCountInQueueExceededLimit();
+ || isTsFileEventCountInQueueExceededLimit()
+ || mayTsFileLinkedCountReachDangerousThreshold();
}
private boolean mayWalSizeReachThrottleThreshold() {
@@ -247,6 +249,11 @@ public class PipeRealtimeDataRegionHybridExtractor extends
PipeRealtimeDataRegio
>=
PipeConfig.getInstance().getPipeMaxAllowedPendingTsFileEpochPerDataRegion();
}
+ private boolean mayTsFileLinkedCountReachDangerousThreshold() {
+ return PipeResourceManager.tsfile().getLinkedTsfileCount()
+ >= PipeConfig.getInstance().getPipeMaxAllowedLinkedTsFileCount();
+ }
+
public void informProcessorEventCollectorQueueTsFileSize(int queueSize) {
processorEventCollectorQueueTsFileSize.set(queueSize);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeExtractorMetrics.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeExtractorMetrics.java
index f56672c7d6d..b9dfb0c8773 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeExtractorMetrics.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeExtractorMetrics.java
@@ -35,7 +35,6 @@ import org.checkerframework.checker.nullness.qual.NonNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
@@ -46,7 +45,7 @@ public class PipeExtractorMetrics implements IMetricSet {
private AbstractMetricService metricService;
- private final Map<String, IoTDBDataRegionExtractor> extractorMap = new
HashMap<>();
+ private final Map<String, IoTDBDataRegionExtractor> extractorMap = new
ConcurrentHashMap<>();
private final Map<String, Rate> tabletRateMap = new ConcurrentHashMap<>();
@@ -56,6 +55,10 @@ public class PipeExtractorMetrics implements IMetricSet {
private final Map<String, Gauge> recentProcessedTsFileEpochStateMap = new
ConcurrentHashMap<>();
+ public Map<String, IoTDBDataRegionExtractor> getExtractorMap() {
+ return extractorMap;
+ }
+
//////////////////////////// bindTo & unbindFrom (metric framework)
////////////////////////////
@Override
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtask.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtask.java
index 061944af1a0..967ca2d6b86 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtask.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtask.java
@@ -355,19 +355,19 @@ public class PipeConnectorSubtask extends
PipeDataNodeSubtask {
return connectorIndex;
}
- public Integer getTsFileInsertionEventCount() {
+ public int getTsFileInsertionEventCount() {
return inputPendingQueue.getTsFileInsertionEventCount();
}
- public Integer getTabletInsertionEventCount() {
+ public int getTabletInsertionEventCount() {
return inputPendingQueue.getTabletInsertionEventCount();
}
- public Integer getPipeHeartbeatEventCount() {
+ public int getPipeHeartbeatEventCount() {
return inputPendingQueue.getPipeHeartbeatEventCount();
}
- public Integer getAsyncConnectorRetryEventQueueSize() {
+ public int getAsyncConnectorRetryEventQueueSize() {
return outputPipeConnector instanceof IoTDBThriftAsyncConnector
? ((IoTDBThriftAsyncConnector)
outputPipeConnector).getRetryEventQueueSize()
: 0;
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
index a461452d807..11fd755a8ca 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
@@ -185,14 +185,16 @@ public class CommonConfig {
private int pipeMaxAllowedPendingTsFileEpochPerDataRegion = 2;
private int pipeMaxAllowedPinnedMemTableCount = 50;
+ private long pipeMaxAllowedLinkedTsFileCount = 100;
+ private long pipeStuckRestartIntervalSeconds = 120;
private boolean pipeMemoryManagementEnabled = true;
private long pipeMemoryAllocateRetryIntervalMs = 1000;
private int pipeMemoryAllocateMaxRetries = 10;
private long pipeMemoryAllocateMinSizeInBytes = 32;
- private long pipeMemoryAllocateForTsFileSequenceReaderInBytes = 2 * 1024 *
1024; // 2MB
- private long pipeMemoryExpanderIntervalSeconds = 3 * 60; // 3Min
- private float PipeLeaderCacheMemoryUsagePercentage = 0.1F;
+ private long pipeMemoryAllocateForTsFileSequenceReaderInBytes = (long) 2 *
1024 * 1024; // 2MB
+ private long pipeMemoryExpanderIntervalSeconds = (long) 3 * 60; // 3Min
+ private float pipeLeaderCacheMemoryUsagePercentage = 0.1F;
/** Whether to use persistent schema mode. */
private String schemaEngineMode = "Memory";
@@ -754,6 +756,22 @@ public class CommonConfig {
this.pipeMaxAllowedPinnedMemTableCount = pipeMaxAllowedPinnedMemTableCount;
}
+ public long getPipeMaxAllowedLinkedTsFileCount() {
+ return pipeMaxAllowedLinkedTsFileCount;
+ }
+
+ public void setPipeMaxAllowedLinkedTsFileCount(long
pipeMaxAllowedLinkedTsFileCount) {
+ this.pipeMaxAllowedLinkedTsFileCount = pipeMaxAllowedLinkedTsFileCount;
+ }
+
+ public long getPipeStuckRestartIntervalSeconds() {
+ return pipeStuckRestartIntervalSeconds;
+ }
+
+ public void setPipeStuckRestartIntervalSeconds(long
pipeStuckRestartIntervalSeconds) {
+ this.pipeStuckRestartIntervalSeconds = pipeStuckRestartIntervalSeconds;
+ }
+
public boolean getPipeMemoryManagementEnabled() {
return pipeMemoryManagementEnabled;
}
@@ -805,11 +823,11 @@ public class CommonConfig {
}
public float getPipeLeaderCacheMemoryUsagePercentage() {
- return PipeLeaderCacheMemoryUsagePercentage;
+ return pipeLeaderCacheMemoryUsagePercentage;
}
public void setPipeLeaderCacheMemoryUsagePercentage(float
pipeLeaderCacheMemoryUsagePercentage) {
- this.PipeLeaderCacheMemoryUsagePercentage =
pipeLeaderCacheMemoryUsagePercentage;
+ this.pipeLeaderCacheMemoryUsagePercentage =
pipeLeaderCacheMemoryUsagePercentage;
}
public String getSchemaEngineMode() {
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
index ab51e7b6428..43ec8f911b4 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
@@ -427,6 +427,16 @@ public class CommonDescriptor {
properties.getProperty(
"pipe_max_allowed_pinned_memtable_count",
String.valueOf(config.getPipeMaxAllowedPinnedMemTableCount()))));
+ config.setPipeMaxAllowedLinkedTsFileCount(
+ Long.parseLong(
+ properties.getProperty(
+ "pipe_max_allowed_linked_tsfile_count",
+ String.valueOf(config.getPipeMaxAllowedLinkedTsFileCount()))));
+ config.setPipeStuckRestartIntervalSeconds(
+ Long.parseLong(
+ properties.getProperty(
+ "pipe_stuck_restart_interval_seconds",
+ String.valueOf(config.getPipeStuckRestartIntervalSeconds()))));
config.setPipeMemoryManagementEnabled(
Boolean.parseBoolean(
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java
index 0a32cd85dfc..3b7bf7712f7 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java
@@ -120,7 +120,7 @@ public abstract class PipeTaskAgent {
}
}
- private TPushPipeMetaRespExceptionMessage
handleSinglePipeMetaChangesInternal(
+ protected TPushPipeMetaRespExceptionMessage
handleSinglePipeMetaChangesInternal(
PipeMeta pipeMetaFromCoordinator) {
// Do nothing if node is removing or removed
if (isShutdown()) {
@@ -291,7 +291,7 @@ public abstract class PipeTaskAgent {
}
}
- private TPushPipeMetaRespExceptionMessage handleDropPipeInternal(String
pipeName) {
+ protected TPushPipeMetaRespExceptionMessage handleDropPipeInternal(String
pipeName) {
// Do nothing if node is removing or removed
if (isShutdown()) {
return null;
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
index 32fb5b0fdbb..74a6711a96b 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
@@ -173,6 +173,14 @@ public class PipeConfig {
return COMMON_CONFIG.getPipeMaxAllowedPinnedMemTableCount();
}
+ public long getPipeMaxAllowedLinkedTsFileCount() {
+ return COMMON_CONFIG.getPipeMaxAllowedLinkedTsFileCount();
+ }
+
+ public long getPipeStuckRestartIntervalSeconds() {
+ return COMMON_CONFIG.getPipeStuckRestartIntervalSeconds();
+ }
+
/////////////////////////////// Memory ///////////////////////////////
public boolean getPipeMemoryManagementEnabled() {
@@ -241,6 +249,8 @@ public class PipeConfig {
LOGGER.info(
"PipeConnectorRPCThriftCompressionEnabled: {}",
isPipeConnectorRPCThriftCompressionEnabled());
+ LOGGER.info(
+ "PipeLeaderCacheMemoryUsagePercentage: {}",
getPipeLeaderCacheMemoryUsagePercentage());
LOGGER.info("PipeAsyncConnectorSelectorNumber: {}",
getPipeAsyncConnectorSelectorNumber());
LOGGER.info("PipeAsyncConnectorMaxClientNumber: {}",
getPipeAsyncConnectorMaxClientNumber());
@@ -264,6 +274,8 @@ public class PipeConfig {
"PipeMaxAllowedPendingTsFileEpochPerDataRegion: {}",
getPipeMaxAllowedPendingTsFileEpochPerDataRegion());
LOGGER.info("PipeMaxAllowedPinnedMemTableCount: {}",
getPipeMaxAllowedPinnedMemTableCount());
+ LOGGER.info("PipeMaxAllowedLinkedTsFileCount: {}",
getPipeMaxAllowedLinkedTsFileCount());
+ LOGGER.info("PipeStuckRestartIntervalSeconds: {}",
getPipeStuckRestartIntervalSeconds());
LOGGER.info("PipeMemoryManagementEnabled: {}",
getPipeMemoryManagementEnabled());
LOGGER.info("PipeMemoryAllocateMaxRetries: {}",
getPipeMemoryAllocateMaxRetries());
@@ -273,9 +285,6 @@ public class PipeConfig {
LOGGER.info(
"PipeMemoryAllocateForTsFileSequenceReaderInBytes: {}",
getPipeMemoryAllocateForTsFileSequenceReaderInBytes());
-
- LOGGER.info(
- "PipeLeaderCacheMemoryUsagePercentage: {}",
getPipeLeaderCacheMemoryUsagePercentage());
}
/////////////////////////////// Singleton ///////////////////////////////