This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch pipe-restart-stuck
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit b75100175804b553897475415d448a46711c876b
Author: Steve Yurong Su <[email protected]>
AuthorDate: Tue Jan 23 13:01:29 2024 +0800

    refactor
---
 .../db/pipe/agent/runtime/PipeAgentLauncher.java   |   6 -
 .../db/pipe/agent/runtime/PipeRuntimeAgent.java    |   5 +
 .../db/pipe/agent/task/PipeTaskDataNodeAgent.java  | 133 ++++++++++++++-------
 .../pipe/extractor/IoTDBDataRegionExtractor.java   |  11 ++
 .../PipeRealtimeDataRegionHybridExtractor.java     |   9 +-
 .../subtask/connector/PipeConnectorSubtask.java    |  16 +--
 .../apache/iotdb/commons/conf/CommonConfig.java    |  43 +++----
 .../iotdb/commons/conf/CommonDescriptor.java       |  25 ++--
 .../iotdb/commons/pipe/config/PipeConfig.java      |  27 ++---
 9 files changed, 158 insertions(+), 117 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeAgentLauncher.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeAgentLauncher.java
index b3b2995c20b..b6b6c83763c 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeAgentLauncher.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeAgentLauncher.java
@@ -21,7 +21,6 @@ package org.apache.iotdb.db.pipe.agent.runtime;
 
 import org.apache.iotdb.commons.client.exception.ClientManagerException;
 import org.apache.iotdb.commons.exception.StartupException;
-import org.apache.iotdb.commons.pipe.config.PipeConfig;
 import org.apache.iotdb.commons.pipe.plugin.meta.PipePluginMeta;
 import 
org.apache.iotdb.commons.pipe.plugin.service.PipePluginClassLoaderManager;
 import 
org.apache.iotdb.commons.pipe.plugin.service.PipePluginExecutableManager;
@@ -172,11 +171,6 @@ class PipeAgentLauncher {
                         return pipeMeta;
                       })
                   .collect(Collectors.toList()));
-      PipeAgent.runtime()
-          .registerPeriodicalJob(
-              "PipeTaskAgent#RestartAllStuckPipes",
-              PipeAgent.task()::restartAllStuckPipes,
-              PipeConfig.getInstance().getPipeStuckRestartIntervalSeconds());
     } catch (Exception e) {
       LOGGER.info(
           "Failed to get pipe task meta from config node. Ignore the 
exception, "
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeRuntimeAgent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeRuntimeAgent.java
index 2414613ad96..75997e483e2 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeRuntimeAgent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeRuntimeAgent.java
@@ -73,6 +73,11 @@ public class PipeRuntimeAgent implements IService {
   public synchronized void start() throws StartupException {
     PipeConfig.getInstance().printAllConfigs();
     PipeAgentLauncher.launchPipeTaskAgent();
+
+    registerPeriodicalJob(
+        "PipeTaskAgent#restartAllStuckPipes",
+        PipeAgent.task()::restartAllStuckPipes,
+        PipeConfig.getInstance().getPipeStuckRestartIntervalSeconds());
     pipePeriodicalJobExecutor.start();
 
     isShutdown.set(false);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskDataNodeAgent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskDataNodeAgent.java
index 3884588de2c..4935872e507 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskDataNodeAgent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskDataNodeAgent.java
@@ -39,6 +39,7 @@ import 
org.apache.iotdb.db.pipe.extractor.realtime.listener.PipeInsertionDataNod
 import org.apache.iotdb.db.pipe.metric.PipeConnectorMetrics;
 import org.apache.iotdb.db.pipe.metric.PipeExtractorMetrics;
 import org.apache.iotdb.db.pipe.metric.PipeProcessorMetrics;
+import org.apache.iotdb.db.pipe.resource.PipeResourceManager;
 import org.apache.iotdb.db.pipe.task.PipeDataNodeTask;
 import org.apache.iotdb.db.pipe.task.builder.PipeDataNodeBuilder;
 import org.apache.iotdb.db.pipe.task.builder.PipeDataNodeTaskDataRegionBuilder;
@@ -46,6 +47,7 @@ import 
org.apache.iotdb.db.pipe.task.builder.PipeDataNodeTaskSchemaRegionBuilder
 import org.apache.iotdb.db.pipe.task.subtask.connector.PipeConnectorSubtask;
 import 
org.apache.iotdb.db.pipe.task.subtask.connector.PipeConnectorSubtaskManager;
 import org.apache.iotdb.db.pipe.task.subtask.processor.PipeProcessorSubtask;
+import org.apache.iotdb.db.storageengine.dataregion.wal.WALManager;
 import org.apache.iotdb.db.utils.DateTimeUtils;
 import org.apache.iotdb.mpp.rpc.thrift.TDataNodeHeartbeatResp;
 import org.apache.iotdb.mpp.rpc.thrift.TPipeHeartbeatReq;
@@ -324,59 +326,106 @@ public class PipeTaskDataNodeAgent extends PipeTaskAgent 
{
   ///////////////////////// Restart Logic /////////////////////////
 
   public void restartAllStuckPipes() {
-    if (!tryWriteLockWithTimeOut(0)) {
+    if (!tryWriteLockWithTimeOut(5)) {
       return;
     }
     try {
-      Map<String, IoTDBDataRegionExtractor> pipeName2ExtractorMap =
-          PipeExtractorMetrics.getInstance().getPipeName2ExtractorMap();
-      Map<String, PipeProcessorSubtask> pipeName2ProcessorSubtaskMap =
-          PipeProcessorMetrics.getInstance().getPipeName2ProcessorSubtaskMap();
-      Map<String, PipeConnectorSubtask> 
attributeSortedStr2PipeConnectorSubtaskMap =
-          
PipeConnectorMetrics.getInstance().getAttributeSortedStr2PipeConnectorSubtaskMap();
-
-      Set<PipeMeta> restartPipes = new HashSet<>();
-
-      for (PipeMeta pipeMeta : pipeMetaKeeper.getPipeMetaList()) {
-        if 
(!pipeMeta.getRuntimeMeta().getStatus().get().equals(PipeStatus.RUNNING)) {
-          continue;
-        }
-        String pipeName = pipeMeta.getStaticMeta().getPipeName();
-        IoTDBDataRegionExtractor extractor = 
pipeName2ExtractorMap.get(pipeName);
-        PipeProcessorSubtask processorSubtask = 
pipeName2ProcessorSubtaskMap.get(pipeName);
-        PipeConnectorSubtask connectorSubtask =
-            attributeSortedStr2PipeConnectorSubtaskMap.get(
-                PipeConnectorSubtaskManager.getAttributeSortedString(
-                    pipeMeta.getStaticMeta().getConnectorParameters()));
-
-        if (Objects.isNull(extractor)
-            || Objects.isNull(processorSubtask)
-            || Objects.isNull(connectorSubtask)) {
-          continue;
-        }
+      restartAllStuckPipesInternal();
+    } finally {
+      releaseWriteLock();
+    }
+  }
 
-        boolean tsFileCountExceeded =
-            extractor.getRealtimeTsFileInsertionEventCount()
-                    + processorSubtask.getTsFileInsertionEventCount()
-                    + connectorSubtask.getTsFileInsertionEventCount()
-                > PipeConfig.getInstance().getPipeMaxAllowedTsFileCount();
-        boolean connectorStuck =
-            System.currentTimeMillis() - 
connectorSubtask.getLastSendExecutionTime()
-                > 
PipeConfig.getInstance().getPipeMaxAllowedConnectorStuckTime();
-        if (tsFileCountExceeded || connectorStuck) {
-          restartPipes.add(pipeMeta);
-        }
+  private void restartAllStuckPipesInternal() {
+    final Map<String, IoTDBDataRegionExtractor> pipeName2ExtractorMap =
+        PipeExtractorMetrics.getInstance().getPipeName2ExtractorMap();
+    final Map<String, PipeProcessorSubtask> pipeName2ProcessorSubtaskMap =
+        PipeProcessorMetrics.getInstance().getPipeName2ProcessorSubtaskMap();
+    final Map<String, PipeConnectorSubtask> 
attributeSortedStr2PipeConnectorSubtaskMap =
+        
PipeConnectorMetrics.getInstance().getAttributeSortedStr2PipeConnectorSubtaskMap();
+
+    final Set<PipeMeta> stuckPipes = new HashSet<>();
+    for (final PipeMeta pipeMeta : pipeMetaKeeper.getPipeMetaList()) {
+      if 
(PipeStatus.RUNNING.equals(pipeMeta.getRuntimeMeta().getStatus().get())) {
+        continue;
       }
 
-      restartPipes.parallelStream().forEach(this::restartPipeInternal);
+      final String pipeName = pipeMeta.getStaticMeta().getPipeName();
+      final IoTDBDataRegionExtractor extractor = 
pipeName2ExtractorMap.get(pipeName);
+      if (Objects.isNull(extractor)
+          || !extractor.isStreamMode()
+          || !extractor.hasConsumedAllHistoricalTsFiles()) {
+        continue;
+      }
+      final PipeProcessorSubtask processorSubtask = 
pipeName2ProcessorSubtaskMap.get(pipeName);
+      if (Objects.isNull(processorSubtask)) {
+        continue;
+      }
+      final PipeConnectorSubtask connectorSubtask =
+          attributeSortedStr2PipeConnectorSubtaskMap.get(
+              PipeConnectorSubtaskManager.getAttributeSortedString(
+                  pipeMeta.getStaticMeta().getConnectorParameters()));
+      if (Objects.isNull(connectorSubtask)) {
+        continue;
+      }
 
-    } finally {
-      releaseWriteLock();
+      if (isPipeLinkedTsFileEventCountExceededLimit(extractor, 
processorSubtask, connectorSubtask)
+          || mayLinkedTsFileCountReachDangerousThreshold()
+          || mayMemTablePinnedCountReachDangerousThreshold()
+          || mayWalSizeReachThrottleThreshold()) {
+        LOGGER.warn("Pipe {} may be stuck.", pipeMeta.getStaticMeta());
+        stuckPipes.add(pipeMeta);
+      }
     }
+
+    // Restart all stuck pipes
+    stuckPipes.parallelStream().forEach(this::restartStuckPipe);
+  }
+
+  private boolean isPipeLinkedTsFileEventCountExceededLimit(
+      IoTDBDataRegionExtractor extractor,
+      PipeProcessorSubtask processorSubtask,
+      PipeConnectorSubtask connectorSubtask) {
+    final int tabletInsertionEventCount =
+        extractor.getTabletInsertionEventCount()
+            + processorSubtask.getTabletInsertionEventCount()
+            + connectorSubtask.getTabletInsertionEventCount();
+    final int tsFileInsertionEventCount =
+        extractor.getRealtimeTsFileInsertionEventCount()
+            + processorSubtask.getTsFileInsertionEventCount()
+            + connectorSubtask.getTsFileInsertionEventCount();
+    return tabletInsertionEventCount > 0
+        && tsFileInsertionEventCount
+            > PipeConfig.getInstance().getPipeMaxAllowedLinkedTsFileCount();
+  }
+
+  private boolean mayLinkedTsFileCountReachDangerousThreshold() {
+    return PipeResourceManager.tsfile().getLinkedTsfileCount()
+        >= 2 * PipeConfig.getInstance().getPipeMaxAllowedLinkedTsFileCount();
+  }
+
+  private boolean mayMemTablePinnedCountReachDangerousThreshold() {
+    return PipeResourceManager.wal().getPinnedWalCount()
+        >= 10 * 
PipeConfig.getInstance().getPipeMaxAllowedPinnedMemTableCount();
+  }
+
+  private boolean mayWalSizeReachThrottleThreshold() {
+    return 3 * WALManager.getInstance().getTotalDiskUsage()
+        > 2 * IoTDBDescriptor.getInstance().getConfig().getThrottleThreshold();
   }
 
-  private void restartPipeInternal(PipeMeta pipeMeta) {
+  private void restartStuckPipe(PipeMeta pipeMeta) {
+    LOGGER.warn(
+        "Pipe {} (creation time = {}) will be restarted because of stuck.",
+        pipeMeta.getStaticMeta().getPipeName(),
+        
DateTimeUtils.convertLongToDate(pipeMeta.getStaticMeta().getCreationTime(), 
"ms"));
+    final long startTime = System.currentTimeMillis();
     handleDropPipeInternal(pipeMeta.getStaticMeta().getPipeName());
     handleSinglePipeMetaChangesInternal(pipeMeta);
+    LOGGER.warn(
+        "Pipe {} (creation time = {}) was restarted because of stuck, time 
cost: {} ms.",
+        pipeMeta.getStaticMeta().getPipeName(),
+        
DateTimeUtils.convertLongToDate(pipeMeta.getStaticMeta().getCreationTime(), 
"ms"),
+        System.currentTimeMillis() - startTime);
   }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/IoTDBDataRegionExtractor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/IoTDBDataRegionExtractor.java
index 58d274b82b1..c13f4071a67 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/IoTDBDataRegionExtractor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/IoTDBDataRegionExtractor.java
@@ -372,6 +372,17 @@ public class IoTDBDataRegionExtractor implements 
PipeExtractor {
     }
   }
 
+  //////////////////////////// APIs provided for detecting stuck 
////////////////////////////
+
+  public boolean isStreamMode() {
+    return realtimeExtractor instanceof PipeRealtimeDataRegionHybridExtractor
+        || realtimeExtractor instanceof PipeRealtimeDataRegionLogExtractor;
+  }
+
+  public boolean hasConsumedAllHistoricalTsFiles() {
+    return historicalExtractor.hasConsumedAll();
+  }
+
   //////////////////////////// APIs provided for metric framework 
////////////////////////////
 
   public String getTaskID() {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionHybridExtractor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionHybridExtractor.java
index 91b786bfffa..568c96b5263 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionHybridExtractor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionHybridExtractor.java
@@ -224,10 +224,12 @@ public class PipeRealtimeDataRegionHybridExtractor 
extends PipeRealtimeDataRegio
     //  the write operation will be throttled, so we should not extract any 
more tablet events.
     //  3. The number of pinned memtables has reached the dangerous threshold.
     //  4. The number of tsfile events in the pending queue has exceeded the 
limit.
+    //  5. The number of linked tsfiles has reached the dangerous threshold.
     return !isStartedToSupply
         || mayWalSizeReachThrottleThreshold()
         || mayMemTablePinnedCountReachDangerousThreshold()
-        || isTsFileEventCountInQueueExceededLimit();
+        || isTsFileEventCountInQueueExceededLimit()
+        || mayTsFileLinkedCountReachDangerousThreshold();
   }
 
   private boolean mayWalSizeReachThrottleThreshold() {
@@ -247,6 +249,11 @@ public class PipeRealtimeDataRegionHybridExtractor extends 
PipeRealtimeDataRegio
         >= 
PipeConfig.getInstance().getPipeMaxAllowedPendingTsFileEpochPerDataRegion();
   }
 
+  private boolean mayTsFileLinkedCountReachDangerousThreshold() {
+    return PipeResourceManager.tsfile().getLinkedTsfileCount()
+        >= PipeConfig.getInstance().getPipeMaxAllowedLinkedTsFileCount();
+  }
+
   public void informProcessorEventCollectorQueueTsFileSize(int queueSize) {
     processorEventCollectorQueueTsFileSize.set(queueSize);
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtask.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtask.java
index e0d77389ec1..967ca2d6b86 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtask.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtask.java
@@ -77,7 +77,6 @@ public class PipeConnectorSubtask extends PipeDataNodeSubtask 
{
   private static final long CRON_HEARTBEAT_EVENT_INJECT_INTERVAL_SECONDS =
       
PipeConfig.getInstance().getPipeSubtaskExecutorCronHeartbeatEventIntervalSeconds();
   private long lastHeartbeatEventInjectTime = System.currentTimeMillis();
-  private long lastSendExecutionTime = System.currentTimeMillis();
 
   public PipeConnectorSubtask(
       String taskID,
@@ -124,7 +123,6 @@ public class PipeConnectorSubtask extends 
PipeDataNodeSubtask {
     final Event event = lastEvent != null ? lastEvent : 
inputPendingQueue.waitedPoll();
     // Record this event for retrying on connection failure or other exceptions
     setLastEvent(event);
-    lastSendExecutionTime = System.currentTimeMillis();
 
     try {
       if (event == null) {
@@ -357,27 +355,21 @@ public class PipeConnectorSubtask extends 
PipeDataNodeSubtask {
     return connectorIndex;
   }
 
-  public Integer getTsFileInsertionEventCount() {
+  public int getTsFileInsertionEventCount() {
     return inputPendingQueue.getTsFileInsertionEventCount();
   }
 
-  public Integer getTabletInsertionEventCount() {
+  public int getTabletInsertionEventCount() {
     return inputPendingQueue.getTabletInsertionEventCount();
   }
 
-  public Integer getPipeHeartbeatEventCount() {
+  public int getPipeHeartbeatEventCount() {
     return inputPendingQueue.getPipeHeartbeatEventCount();
   }
 
-  public Integer getAsyncConnectorRetryEventQueueSize() {
+  public int getAsyncConnectorRetryEventQueueSize() {
     return outputPipeConnector instanceof IoTDBThriftAsyncConnector
         ? ((IoTDBThriftAsyncConnector) 
outputPipeConnector).getRetryEventQueueSize()
         : 0;
   }
-
-  //////////////////////////// APIs provided for restart metric 
////////////////////////////
-
-  public long getLastSendExecutionTime() {
-    return lastSendExecutionTime;
-  }
 }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
index 754411cbd11..002183372c3 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
@@ -185,6 +185,7 @@ public class CommonConfig {
 
   private int pipeMaxAllowedPendingTsFileEpochPerDataRegion = 2;
   private int pipeMaxAllowedPinnedMemTableCount = 50;
+  private long pipeMaxAllowedLinkedTsFileCount = 100;
 
   private boolean pipeMemoryManagementEnabled = true;
   private long pipeMemoryAllocateRetryIntervalMs = 1000;
@@ -195,8 +196,6 @@ public class CommonConfig {
   private float pipeLeaderCacheMemoryUsagePercentage = 0.1F;
 
   private long pipeStuckRestartIntervalSeconds = 60;
-  private long pipeMaxAllowedTsFileCount = 1000;
-  private long pipeMaxAllowedConnectorStuckTime = (long) 15 * 60 * 1000;
 
   /** Whether to use persistent schema mode. */
   private String schemaEngineMode = "Memory";
@@ -758,6 +757,22 @@ public class CommonConfig {
     this.pipeMaxAllowedPinnedMemTableCount = pipeMaxAllowedPinnedMemTableCount;
   }
 
+  public long getPipeMaxAllowedLinkedTsFileCount() {
+    return pipeMaxAllowedLinkedTsFileCount;
+  }
+
+  public void setPipeMaxAllowedLinkedTsFileCount(long 
pipeMaxAllowedLinkedTsFileCount) {
+    this.pipeMaxAllowedLinkedTsFileCount = pipeMaxAllowedLinkedTsFileCount;
+  }
+
+  public long getPipeStuckRestartIntervalSeconds() {
+    return pipeStuckRestartIntervalSeconds;
+  }
+
+  public void setPipeStuckRestartIntervalSeconds(long 
pipeStuckRestartIntervalSeconds) {
+    this.pipeStuckRestartIntervalSeconds = pipeStuckRestartIntervalSeconds;
+  }
+
   public boolean getPipeMemoryManagementEnabled() {
     return pipeMemoryManagementEnabled;
   }
@@ -816,30 +831,6 @@ public class CommonConfig {
     this.pipeLeaderCacheMemoryUsagePercentage = 
pipeLeaderCacheMemoryUsagePercentage;
   }
 
-  public long getPipeStuckRestartIntervalSeconds() {
-    return pipeStuckRestartIntervalSeconds;
-  }
-
-  public void setPipeStuckRestartIntervalSeconds(long 
pipeStuckRestartIntervalSeconds) {
-    this.pipeStuckRestartIntervalSeconds = pipeStuckRestartIntervalSeconds;
-  }
-
-  public long getPipeMaxAllowedTsFileCount() {
-    return pipeMaxAllowedTsFileCount;
-  }
-
-  public void setPipeMaxAllowedTsFileCount(long pipeMaxAllowedTsFileCount) {
-    this.pipeMaxAllowedTsFileCount = pipeMaxAllowedTsFileCount;
-  }
-
-  public long getPipeMaxAllowedConnectorStuckTime() {
-    return pipeMaxAllowedConnectorStuckTime;
-  }
-
-  public void setPipeMaxAllowedConnectorStuckTime(long 
pipeMaxAllowedConnectorStuckTime) {
-    this.pipeMaxAllowedConnectorStuckTime = pipeMaxAllowedConnectorStuckTime;
-  }
-
   public String getSchemaEngineMode() {
     return schemaEngineMode;
   }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
index b3c5a425de4..43ec8f911b4 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
@@ -427,6 +427,16 @@ public class CommonDescriptor {
             properties.getProperty(
                 "pipe_max_allowed_pinned_memtable_count",
                 
String.valueOf(config.getPipeMaxAllowedPinnedMemTableCount()))));
+    config.setPipeMaxAllowedLinkedTsFileCount(
+        Long.parseLong(
+            properties.getProperty(
+                "pipe_max_allowed_linked_tsfile_count",
+                String.valueOf(config.getPipeMaxAllowedLinkedTsFileCount()))));
+    config.setPipeStuckRestartIntervalSeconds(
+        Long.parseLong(
+            properties.getProperty(
+                "pipe_stuck_restart_interval_seconds",
+                String.valueOf(config.getPipeStuckRestartIntervalSeconds()))));
 
     config.setPipeMemoryManagementEnabled(
         Boolean.parseBoolean(
@@ -463,21 +473,6 @@ public class CommonDescriptor {
             properties.getProperty(
                 "pipe_leader_cache_memory_usage_percentage",
                 
String.valueOf(config.getPipeLeaderCacheMemoryUsagePercentage()))));
-    config.setPipeStuckRestartIntervalSeconds(
-        Long.parseLong(
-            properties.getProperty(
-                "pipe_stuck_restart_interval_seconds",
-                String.valueOf(config.getPipeStuckRestartIntervalSeconds()))));
-    config.setPipeMaxAllowedTsFileCount(
-        Long.parseLong(
-            properties.getProperty(
-                "pipe_max_allowed_tsfile_count",
-                String.valueOf(config.getPipeMaxAllowedTsFileCount()))));
-    config.setPipeMaxAllowedConnectorStuckTime(
-        Long.parseLong(
-            properties.getProperty(
-                "pipe_max_allowed_connector_stuck_time",
-                
String.valueOf(config.getPipeMaxAllowedConnectorStuckTime()))));
   }
 
   public void loadGlobalConfig(TGlobalConfig globalConfig) {
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
index 2691f88ecbe..74a6711a96b 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
@@ -127,18 +127,6 @@ public class PipeConfig {
     return COMMON_CONFIG.getPipeLeaderCacheMemoryUsagePercentage();
   }
 
-  public long getPipeStuckRestartIntervalSeconds() {
-    return COMMON_CONFIG.getPipeStuckRestartIntervalSeconds();
-  }
-
-  public long getPipeMaxAllowedTsFileCount() {
-    return COMMON_CONFIG.getPipeMaxAllowedTsFileCount();
-  }
-
-  public long getPipeMaxAllowedConnectorStuckTime() {
-    return COMMON_CONFIG.getPipeMaxAllowedConnectorStuckTime();
-  }
-
   /////////////////////////////// Meta Consistency 
///////////////////////////////
 
   public boolean isSeperatedPipeHeartbeatEnabled() {
@@ -185,6 +173,14 @@ public class PipeConfig {
     return COMMON_CONFIG.getPipeMaxAllowedPinnedMemTableCount();
   }
 
+  public long getPipeMaxAllowedLinkedTsFileCount() {
+    return COMMON_CONFIG.getPipeMaxAllowedLinkedTsFileCount();
+  }
+
+  public long getPipeStuckRestartIntervalSeconds() {
+    return COMMON_CONFIG.getPipeStuckRestartIntervalSeconds();
+  }
+
   /////////////////////////////// Memory ///////////////////////////////
 
   public boolean getPipeMemoryManagementEnabled() {
@@ -253,6 +249,8 @@ public class PipeConfig {
     LOGGER.info(
         "PipeConnectorRPCThriftCompressionEnabled: {}",
         isPipeConnectorRPCThriftCompressionEnabled());
+    LOGGER.info(
+        "PipeLeaderCacheMemoryUsagePercentage: {}", 
getPipeLeaderCacheMemoryUsagePercentage());
 
     LOGGER.info("PipeAsyncConnectorSelectorNumber: {}", 
getPipeAsyncConnectorSelectorNumber());
     LOGGER.info("PipeAsyncConnectorMaxClientNumber: {}", 
getPipeAsyncConnectorMaxClientNumber());
@@ -276,6 +274,8 @@ public class PipeConfig {
         "PipeMaxAllowedPendingTsFileEpochPerDataRegion: {}",
         getPipeMaxAllowedPendingTsFileEpochPerDataRegion());
     LOGGER.info("PipeMaxAllowedPinnedMemTableCount: {}", 
getPipeMaxAllowedPinnedMemTableCount());
+    LOGGER.info("PipeMaxAllowedLinkedTsFileCount: {}", 
getPipeMaxAllowedLinkedTsFileCount());
+    LOGGER.info("PipeStuckRestartIntervalSeconds: {}", 
getPipeStuckRestartIntervalSeconds());
 
     LOGGER.info("PipeMemoryManagementEnabled: {}", 
getPipeMemoryManagementEnabled());
     LOGGER.info("PipeMemoryAllocateMaxRetries: {}", 
getPipeMemoryAllocateMaxRetries());
@@ -285,9 +285,6 @@ public class PipeConfig {
     LOGGER.info(
         "PipeMemoryAllocateForTsFileSequenceReaderInBytes: {}",
         getPipeMemoryAllocateForTsFileSequenceReaderInBytes());
-
-    LOGGER.info(
-        "PipeLeaderCacheMemoryUsagePercentage: {}", 
getPipeLeaderCacheMemoryUsagePercentage());
   }
 
   /////////////////////////////// Singleton ///////////////////////////////

Reply via email to