This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 15f63300fb7 Pipe: support restarting pipes on datanodes when they are 
stuck (#11955)
15f63300fb7 is described below

commit 15f63300fb78fd2296625e598e796191c43e1de6
Author: Steve Yurong Su <[email protected]>
AuthorDate: Tue Jan 23 17:27:29 2024 +0800

    Pipe: support restarting pipes on datanodes when they are stuck (#11955)
    
    Co-authored-by: Caideyipi <[email protected]>
---
 .../db/pipe/agent/runtime/PipeRuntimeAgent.java    |  5 ++
 .../db/pipe/agent/task/PipeTaskDataNodeAgent.java  | 77 ++++++++++++++++++++++
 .../pipe/extractor/IoTDBDataRegionExtractor.java   | 11 ++++
 .../PipeRealtimeDataRegionHybridExtractor.java     |  9 ++-
 .../iotdb/db/pipe/metric/PipeExtractorMetrics.java |  7 +-
 .../subtask/connector/PipeConnectorSubtask.java    |  8 +--
 .../apache/iotdb/commons/conf/CommonConfig.java    | 28 ++++++--
 .../iotdb/commons/conf/CommonDescriptor.java       | 10 +++
 .../commons/pipe/agent/task/PipeTaskAgent.java     |  4 +-
 .../iotdb/commons/pipe/config/PipeConfig.java      | 15 ++++-
 10 files changed, 157 insertions(+), 17 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeRuntimeAgent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeRuntimeAgent.java
index 2414613ad96..75997e483e2 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeRuntimeAgent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeRuntimeAgent.java
@@ -73,6 +73,11 @@ public class PipeRuntimeAgent implements IService {
   public synchronized void start() throws StartupException {
     PipeConfig.getInstance().printAllConfigs();
     PipeAgentLauncher.launchPipeTaskAgent();
+
+    registerPeriodicalJob(
+        "PipeTaskAgent#restartAllStuckPipes",
+        PipeAgent.task()::restartAllStuckPipes,
+        PipeConfig.getInstance().getPipeStuckRestartIntervalSeconds());
     pipePeriodicalJobExecutor.start();
 
     isShutdown.set(false);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskDataNodeAgent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskDataNodeAgent.java
index 15741629389..a42adb7cdc8 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskDataNodeAgent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskDataNodeAgent.java
@@ -24,6 +24,7 @@ import 
org.apache.iotdb.commons.exception.pipe.PipeRuntimeConnectorCriticalExcep
 import org.apache.iotdb.commons.exception.pipe.PipeRuntimeCriticalException;
 import org.apache.iotdb.commons.exception.pipe.PipeRuntimeException;
 import org.apache.iotdb.commons.pipe.agent.task.PipeTaskAgent;
+import org.apache.iotdb.commons.pipe.config.PipeConfig;
 import org.apache.iotdb.commons.pipe.task.PipeTask;
 import org.apache.iotdb.commons.pipe.task.meta.PipeMeta;
 import org.apache.iotdb.commons.pipe.task.meta.PipeRuntimeMeta;
@@ -33,11 +34,15 @@ import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.pipe.agent.PipeAgent;
+import org.apache.iotdb.db.pipe.extractor.IoTDBDataRegionExtractor;
 import 
org.apache.iotdb.db.pipe.extractor.realtime.listener.PipeInsertionDataNodeListener;
+import org.apache.iotdb.db.pipe.metric.PipeExtractorMetrics;
+import org.apache.iotdb.db.pipe.resource.PipeResourceManager;
 import org.apache.iotdb.db.pipe.task.PipeDataNodeTask;
 import org.apache.iotdb.db.pipe.task.builder.PipeDataNodeBuilder;
 import org.apache.iotdb.db.pipe.task.builder.PipeDataNodeTaskDataRegionBuilder;
 import 
org.apache.iotdb.db.pipe.task.builder.PipeDataNodeTaskSchemaRegionBuilder;
+import org.apache.iotdb.db.storageengine.dataregion.wal.WALManager;
 import org.apache.iotdb.db.utils.DateTimeUtils;
 import org.apache.iotdb.mpp.rpc.thrift.TDataNodeHeartbeatResp;
 import org.apache.iotdb.mpp.rpc.thrift.TPipeHeartbeatReq;
@@ -52,8 +57,11 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
 
 import static 
org.apache.iotdb.common.rpc.thrift.TConsensusGroupType.ConfigRegion;
 
@@ -309,4 +317,73 @@ public class PipeTaskDataNodeAgent extends PipeTaskAgent {
 
     PipeInsertionDataNodeListener.getInstance().listenToHeartbeat(true);
   }
+
+  ///////////////////////// Restart Logic /////////////////////////
+
+  public void restartAllStuckPipes() {
+    if (!tryWriteLockWithTimeOut(5)) {
+      return;
+    }
+    try {
+      restartAllStuckPipesInternal();
+    } finally {
+      releaseWriteLock();
+    }
+  }
+
+  private void restartAllStuckPipesInternal() {
+    final Map<String, IoTDBDataRegionExtractor> taskId2ExtractorMap =
+        PipeExtractorMetrics.getInstance().getExtractorMap();
+
+    final Set<PipeMeta> stuckPipes = new HashSet<>();
+    for (final PipeMeta pipeMeta : pipeMetaKeeper.getPipeMetaList()) {
+      final String pipeName = pipeMeta.getStaticMeta().getPipeName();
+      final List<IoTDBDataRegionExtractor> extractors =
+          taskId2ExtractorMap.values().stream()
+              .filter(e -> e.getPipeName().equals(pipeName))
+              .collect(Collectors.toList());
+      if (extractors.isEmpty()
+          || !extractors.get(0).isStreamMode()
+          || extractors.stream()
+              
.noneMatch(IoTDBDataRegionExtractor::hasConsumedAllHistoricalTsFiles)) {
+        continue;
+      }
+
+      if (mayLinkedTsFileCountReachDangerousThreshold()
+          || mayMemTablePinnedCountReachDangerousThreshold()
+          || mayWalSizeReachThrottleThreshold()) {
+        LOGGER.warn("Pipe {} may be stuck.", pipeMeta.getStaticMeta());
+        stuckPipes.add(pipeMeta);
+      }
+    }
+
+    // Restart all stuck pipes
+    stuckPipes.parallelStream().forEach(this::restartStuckPipe);
+  }
+
+  private boolean mayLinkedTsFileCountReachDangerousThreshold() {
+    return PipeResourceManager.tsfile().getLinkedTsfileCount()
+        >= 2 * PipeConfig.getInstance().getPipeMaxAllowedLinkedTsFileCount();
+  }
+
+  private boolean mayMemTablePinnedCountReachDangerousThreshold() {
+    return PipeResourceManager.wal().getPinnedWalCount()
+        >= 10 * 
PipeConfig.getInstance().getPipeMaxAllowedPinnedMemTableCount();
+  }
+
+  private boolean mayWalSizeReachThrottleThreshold() {
+    return 3 * WALManager.getInstance().getTotalDiskUsage()
+        > 2 * IoTDBDescriptor.getInstance().getConfig().getThrottleThreshold();
+  }
+
+  private void restartStuckPipe(PipeMeta pipeMeta) {
+    LOGGER.warn("Pipe {} will be restarted because of stuck.", 
pipeMeta.getStaticMeta());
+    final long startTime = System.currentTimeMillis();
+    handleDropPipeInternal(pipeMeta.getStaticMeta().getPipeName());
+    handleSinglePipeMetaChangesInternal(pipeMeta);
+    LOGGER.warn(
+        "Pipe {} was restarted because of stuck, time cost: {} ms.",
+        pipeMeta.getStaticMeta(),
+        System.currentTimeMillis() - startTime);
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/IoTDBDataRegionExtractor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/IoTDBDataRegionExtractor.java
index 58d274b82b1..c13f4071a67 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/IoTDBDataRegionExtractor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/IoTDBDataRegionExtractor.java
@@ -372,6 +372,17 @@ public class IoTDBDataRegionExtractor implements 
PipeExtractor {
     }
   }
 
+  //////////////////////////// APIs provided for detecting stuck 
////////////////////////////
+
+  public boolean isStreamMode() {
+    return realtimeExtractor instanceof PipeRealtimeDataRegionHybridExtractor
+        || realtimeExtractor instanceof PipeRealtimeDataRegionLogExtractor;
+  }
+
+  public boolean hasConsumedAllHistoricalTsFiles() {
+    return historicalExtractor.hasConsumedAll();
+  }
+
   //////////////////////////// APIs provided for metric framework 
////////////////////////////
 
   public String getTaskID() {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionHybridExtractor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionHybridExtractor.java
index 91b786bfffa..568c96b5263 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionHybridExtractor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionHybridExtractor.java
@@ -224,10 +224,12 @@ public class PipeRealtimeDataRegionHybridExtractor 
extends PipeRealtimeDataRegio
     //  the write operation will be throttled, so we should not extract any 
more tablet events.
     //  3. The number of pinned memtables has reached the dangerous threshold.
     //  4. The number of tsfile events in the pending queue has exceeded the 
limit.
+    //  5. The number of linked tsfiles has reached the dangerous threshold.
     return !isStartedToSupply
         || mayWalSizeReachThrottleThreshold()
         || mayMemTablePinnedCountReachDangerousThreshold()
-        || isTsFileEventCountInQueueExceededLimit();
+        || isTsFileEventCountInQueueExceededLimit()
+        || mayTsFileLinkedCountReachDangerousThreshold();
   }
 
   private boolean mayWalSizeReachThrottleThreshold() {
@@ -247,6 +249,11 @@ public class PipeRealtimeDataRegionHybridExtractor extends 
PipeRealtimeDataRegio
         >= 
PipeConfig.getInstance().getPipeMaxAllowedPendingTsFileEpochPerDataRegion();
   }
 
+  private boolean mayTsFileLinkedCountReachDangerousThreshold() {
+    return PipeResourceManager.tsfile().getLinkedTsfileCount()
+        >= PipeConfig.getInstance().getPipeMaxAllowedLinkedTsFileCount();
+  }
+
   public void informProcessorEventCollectorQueueTsFileSize(int queueSize) {
     processorEventCollectorQueueTsFileSize.set(queueSize);
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeExtractorMetrics.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeExtractorMetrics.java
index f56672c7d6d..b9dfb0c8773 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeExtractorMetrics.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeExtractorMetrics.java
@@ -35,7 +35,6 @@ import org.checkerframework.checker.nullness.qual.NonNull;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.HashMap;
 import java.util.Map;
 import java.util.Objects;
 import java.util.concurrent.ConcurrentHashMap;
@@ -46,7 +45,7 @@ public class PipeExtractorMetrics implements IMetricSet {
 
   private AbstractMetricService metricService;
 
-  private final Map<String, IoTDBDataRegionExtractor> extractorMap = new 
HashMap<>();
+  private final Map<String, IoTDBDataRegionExtractor> extractorMap = new 
ConcurrentHashMap<>();
 
   private final Map<String, Rate> tabletRateMap = new ConcurrentHashMap<>();
 
@@ -56,6 +55,10 @@ public class PipeExtractorMetrics implements IMetricSet {
 
   private final Map<String, Gauge> recentProcessedTsFileEpochStateMap = new 
ConcurrentHashMap<>();
 
+  public Map<String, IoTDBDataRegionExtractor> getExtractorMap() {
+    return extractorMap;
+  }
+
   //////////////////////////// bindTo & unbindFrom (metric framework) 
////////////////////////////
 
   @Override
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtask.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtask.java
index 061944af1a0..967ca2d6b86 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtask.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtask.java
@@ -355,19 +355,19 @@ public class PipeConnectorSubtask extends 
PipeDataNodeSubtask {
     return connectorIndex;
   }
 
-  public Integer getTsFileInsertionEventCount() {
+  public int getTsFileInsertionEventCount() {
     return inputPendingQueue.getTsFileInsertionEventCount();
   }
 
-  public Integer getTabletInsertionEventCount() {
+  public int getTabletInsertionEventCount() {
     return inputPendingQueue.getTabletInsertionEventCount();
   }
 
-  public Integer getPipeHeartbeatEventCount() {
+  public int getPipeHeartbeatEventCount() {
     return inputPendingQueue.getPipeHeartbeatEventCount();
   }
 
-  public Integer getAsyncConnectorRetryEventQueueSize() {
+  public int getAsyncConnectorRetryEventQueueSize() {
     return outputPipeConnector instanceof IoTDBThriftAsyncConnector
         ? ((IoTDBThriftAsyncConnector) 
outputPipeConnector).getRetryEventQueueSize()
         : 0;
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
index a461452d807..11fd755a8ca 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
@@ -185,14 +185,16 @@ public class CommonConfig {
 
   private int pipeMaxAllowedPendingTsFileEpochPerDataRegion = 2;
   private int pipeMaxAllowedPinnedMemTableCount = 50;
+  private long pipeMaxAllowedLinkedTsFileCount = 100;
+  private long pipeStuckRestartIntervalSeconds = 120;
 
   private boolean pipeMemoryManagementEnabled = true;
   private long pipeMemoryAllocateRetryIntervalMs = 1000;
   private int pipeMemoryAllocateMaxRetries = 10;
   private long pipeMemoryAllocateMinSizeInBytes = 32;
-  private long pipeMemoryAllocateForTsFileSequenceReaderInBytes = 2 * 1024 * 
1024; // 2MB
-  private long pipeMemoryExpanderIntervalSeconds = 3 * 60; // 3Min
-  private float PipeLeaderCacheMemoryUsagePercentage = 0.1F;
+  private long pipeMemoryAllocateForTsFileSequenceReaderInBytes = (long) 2 * 
1024 * 1024; // 2MB
+  private long pipeMemoryExpanderIntervalSeconds = (long) 3 * 60; // 3Min
+  private float pipeLeaderCacheMemoryUsagePercentage = 0.1F;
 
   /** Whether to use persistent schema mode. */
   private String schemaEngineMode = "Memory";
@@ -754,6 +756,22 @@ public class CommonConfig {
     this.pipeMaxAllowedPinnedMemTableCount = pipeMaxAllowedPinnedMemTableCount;
   }
 
+  public long getPipeMaxAllowedLinkedTsFileCount() {
+    return pipeMaxAllowedLinkedTsFileCount;
+  }
+
+  public void setPipeMaxAllowedLinkedTsFileCount(long 
pipeMaxAllowedLinkedTsFileCount) {
+    this.pipeMaxAllowedLinkedTsFileCount = pipeMaxAllowedLinkedTsFileCount;
+  }
+
+  public long getPipeStuckRestartIntervalSeconds() {
+    return pipeStuckRestartIntervalSeconds;
+  }
+
+  public void setPipeStuckRestartIntervalSeconds(long 
pipeStuckRestartIntervalSeconds) {
+    this.pipeStuckRestartIntervalSeconds = pipeStuckRestartIntervalSeconds;
+  }
+
   public boolean getPipeMemoryManagementEnabled() {
     return pipeMemoryManagementEnabled;
   }
@@ -805,11 +823,11 @@ public class CommonConfig {
   }
 
   public float getPipeLeaderCacheMemoryUsagePercentage() {
-    return PipeLeaderCacheMemoryUsagePercentage;
+    return pipeLeaderCacheMemoryUsagePercentage;
   }
 
   public void setPipeLeaderCacheMemoryUsagePercentage(float 
pipeLeaderCacheMemoryUsagePercentage) {
-    this.PipeLeaderCacheMemoryUsagePercentage = 
pipeLeaderCacheMemoryUsagePercentage;
+    this.pipeLeaderCacheMemoryUsagePercentage = 
pipeLeaderCacheMemoryUsagePercentage;
   }
 
   public String getSchemaEngineMode() {
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
index ab51e7b6428..43ec8f911b4 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
@@ -427,6 +427,16 @@ public class CommonDescriptor {
             properties.getProperty(
                 "pipe_max_allowed_pinned_memtable_count",
                 
String.valueOf(config.getPipeMaxAllowedPinnedMemTableCount()))));
+    config.setPipeMaxAllowedLinkedTsFileCount(
+        Long.parseLong(
+            properties.getProperty(
+                "pipe_max_allowed_linked_tsfile_count",
+                String.valueOf(config.getPipeMaxAllowedLinkedTsFileCount()))));
+    config.setPipeStuckRestartIntervalSeconds(
+        Long.parseLong(
+            properties.getProperty(
+                "pipe_stuck_restart_interval_seconds",
+                String.valueOf(config.getPipeStuckRestartIntervalSeconds()))));
 
     config.setPipeMemoryManagementEnabled(
         Boolean.parseBoolean(
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java
index 0a32cd85dfc..3b7bf7712f7 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java
@@ -120,7 +120,7 @@ public abstract class PipeTaskAgent {
     }
   }
 
-  private TPushPipeMetaRespExceptionMessage 
handleSinglePipeMetaChangesInternal(
+  protected TPushPipeMetaRespExceptionMessage 
handleSinglePipeMetaChangesInternal(
       PipeMeta pipeMetaFromCoordinator) {
     // Do nothing if node is removing or removed
     if (isShutdown()) {
@@ -291,7 +291,7 @@ public abstract class PipeTaskAgent {
     }
   }
 
-  private TPushPipeMetaRespExceptionMessage handleDropPipeInternal(String 
pipeName) {
+  protected TPushPipeMetaRespExceptionMessage handleDropPipeInternal(String 
pipeName) {
     // Do nothing if node is removing or removed
     if (isShutdown()) {
       return null;
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
index 32fb5b0fdbb..74a6711a96b 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
@@ -173,6 +173,14 @@ public class PipeConfig {
     return COMMON_CONFIG.getPipeMaxAllowedPinnedMemTableCount();
   }
 
+  public long getPipeMaxAllowedLinkedTsFileCount() {
+    return COMMON_CONFIG.getPipeMaxAllowedLinkedTsFileCount();
+  }
+
+  public long getPipeStuckRestartIntervalSeconds() {
+    return COMMON_CONFIG.getPipeStuckRestartIntervalSeconds();
+  }
+
   /////////////////////////////// Memory ///////////////////////////////
 
   public boolean getPipeMemoryManagementEnabled() {
@@ -241,6 +249,8 @@ public class PipeConfig {
     LOGGER.info(
         "PipeConnectorRPCThriftCompressionEnabled: {}",
         isPipeConnectorRPCThriftCompressionEnabled());
+    LOGGER.info(
+        "PipeLeaderCacheMemoryUsagePercentage: {}", 
getPipeLeaderCacheMemoryUsagePercentage());
 
     LOGGER.info("PipeAsyncConnectorSelectorNumber: {}", 
getPipeAsyncConnectorSelectorNumber());
     LOGGER.info("PipeAsyncConnectorMaxClientNumber: {}", 
getPipeAsyncConnectorMaxClientNumber());
@@ -264,6 +274,8 @@ public class PipeConfig {
         "PipeMaxAllowedPendingTsFileEpochPerDataRegion: {}",
         getPipeMaxAllowedPendingTsFileEpochPerDataRegion());
     LOGGER.info("PipeMaxAllowedPinnedMemTableCount: {}", 
getPipeMaxAllowedPinnedMemTableCount());
+    LOGGER.info("PipeMaxAllowedLinkedTsFileCount: {}", 
getPipeMaxAllowedLinkedTsFileCount());
+    LOGGER.info("PipeStuckRestartIntervalSeconds: {}", 
getPipeStuckRestartIntervalSeconds());
 
     LOGGER.info("PipeMemoryManagementEnabled: {}", 
getPipeMemoryManagementEnabled());
     LOGGER.info("PipeMemoryAllocateMaxRetries: {}", 
getPipeMemoryAllocateMaxRetries());
@@ -273,9 +285,6 @@ public class PipeConfig {
     LOGGER.info(
         "PipeMemoryAllocateForTsFileSequenceReaderInBytes: {}",
         getPipeMemoryAllocateForTsFileSequenceReaderInBytes());
-
-    LOGGER.info(
-        "PipeLeaderCacheMemoryUsagePercentage: {}", 
getPipeLeaderCacheMemoryUsagePercentage());
   }
 
   /////////////////////////////// Singleton ///////////////////////////////

Reply via email to