This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/dev/1.3 by this push:
     new e95510d4aeb Pipe: Optimize realtime performace when pipe starts after 
long time stop with heavy data backlog (#15048)
e95510d4aeb is described below

commit e95510d4aeb94c12e5646e76fa33c4584615b072
Author: Steve Yurong Su <[email protected]>
AuthorDate: Mon Mar 10 19:34:36 2025 +0800

    Pipe: Optimize realtime performace when pipe starts after long time stop 
with heavy data backlog (#15048)
---
 .../db/pipe/agent/task/PipeDataNodeTaskAgent.java  | 54 +++++++++++++++++++++-
 .../PipeRealtimePriorityBlockingQueue.java         | 49 ++++++++++++++------
 .../async/IoTDBDataRegionAsyncConnector.java       | 20 +++++++-
 .../PipeRealtimeDataRegionHybridExtractor.java     |  7 +++
 .../dataregion/wal/utils/WALInsertNodeCache.java   |  7 ++-
 .../apache/iotdb/commons/conf/CommonConfig.java    | 44 +++++++++++++++---
 .../iotdb/commons/conf/CommonDescriptor.java       | 28 +++++++++--
 .../commons/pipe/agent/task/PipeTaskAgent.java     |  2 +-
 .../iotdb/commons/pipe/config/PipeConfig.java      | 27 +++++++++--
 9 files changed, 205 insertions(+), 33 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
index 34d11913dff..f521698b87c 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
@@ -111,6 +111,54 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
     return new PipeDataNodeBuilder(pipeMetaFromConfigNode).build();
   }
 
+  ////////////////////////// Manage by Pipe Name //////////////////////////
+
+  @Override
+  protected void startPipe(final String pipeName, final long creationTime) {
+    final PipeMeta existedPipeMeta = pipeMetaKeeper.getPipeMeta(pipeName);
+    final PipeStatus status = 
existedPipeMeta.getRuntimeMeta().getStatus().get();
+    if (PipeStatus.STOPPED.equals(status) || status == null) {
+      restartPipeToReloadResourceIfNeeded(existedPipeMeta);
+    }
+
+    super.startPipe(pipeName, creationTime);
+  }
+
+  private void restartPipeToReloadResourceIfNeeded(final PipeMeta pipeMeta) {
+    if (System.currentTimeMillis() - pipeMeta.getStaticMeta().getCreationTime()
+        < PipeConfig.getInstance().getPipeStuckRestartMinIntervalMs()) {
+      return;
+    }
+
+    final AtomicLong lastRestartTime =
+        
PIPE_NAME_TO_LAST_RESTART_TIME_MAP.get(pipeMeta.getStaticMeta().getPipeName());
+    if (lastRestartTime != null
+        && System.currentTimeMillis() - lastRestartTime.get()
+            < PipeConfig.getInstance().getPipeStuckRestartMinIntervalMs()) {
+      LOGGER.info(
+          "Skipping reload resource for stopped pipe {} before starting it 
because reloading resource is too frequent.",
+          pipeMeta.getStaticMeta().getPipeName());
+      return;
+    }
+
+    if (PIPE_NAME_TO_LAST_RESTART_TIME_MAP.isEmpty()) {
+      LOGGER.info(
+          "Flushing storage engine before restarting pipe {}.",
+          pipeMeta.getStaticMeta().getPipeName());
+      final long currentTime = System.currentTimeMillis();
+      StorageEngine.getInstance().syncCloseAllProcessor();
+      WALManager.getInstance().syncDeleteOutdatedFilesInWALNodes();
+      LOGGER.info(
+          "Finished flushing storage engine, time cost: {} ms.",
+          System.currentTimeMillis() - currentTime);
+    }
+
+    restartStuckPipe(pipeMeta);
+    LOGGER.info(
+        "Reloaded resource for stopped pipe {} before starting it.",
+        pipeMeta.getStaticMeta().getPipeName());
+  }
+
   ///////////////////////// Manage by regionGroupId /////////////////////////
 
   @Override
@@ -674,7 +722,9 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
   }
 
   private void restartStuckPipe(final PipeMeta pipeMeta) {
-    LOGGER.warn("Pipe {} will be restarted because of stuck.", 
pipeMeta.getStaticMeta());
+    LOGGER.warn(
+        "Pipe {} will be restarted because it is stuck or has encountered 
issues such as data backlog or being stopped for too long.",
+        pipeMeta.getStaticMeta());
     acquireWriteLock();
     try {
       final long startTime = System.currentTimeMillis();
@@ -688,7 +738,7 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
       handleSinglePipeMetaChanges(originalPipeMeta);
 
       LOGGER.warn(
-          "Pipe {} was restarted because of stuck, time cost: {} ms.",
+          "Pipe {} was restarted because of stuck or data backlog, time cost: 
{} ms.",
           originalPipeMeta.getStaticMeta(),
           System.currentTimeMillis() - startTime);
     } catch (final Exception e) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/connector/PipeRealtimePriorityBlockingQueue.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/connector/PipeRealtimePriorityBlockingQueue.java
index a170b19bd42..d9eac5f5625 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/connector/PipeRealtimePriorityBlockingQueue.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/connector/PipeRealtimePriorityBlockingQueue.java
@@ -33,6 +33,7 @@ import java.util.Objects;
 import java.util.concurrent.BlockingDeque;
 import java.util.concurrent.LinkedBlockingDeque;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.Consumer;
 
 public class PipeRealtimePriorityBlockingQueue extends 
UnboundedBlockingPendingQueue<Event> {
@@ -40,10 +41,13 @@ public class PipeRealtimePriorityBlockingQueue extends 
UnboundedBlockingPendingQ
   private final BlockingDeque<TsFileInsertionEvent> tsfileInsertEventDeque =
       new LinkedBlockingDeque<>();
 
-  private final AtomicInteger eventCount = new AtomicInteger(0);
+  private static final int POLL_TSFILE_THRESHOLD =
+      PipeConfig.getInstance().getPipeRealTimeQueuePollTsFileThreshold();
+  private final AtomicInteger pollTsFileCounter = new AtomicInteger(0);
 
-  private static final int pollHistoryThreshold =
-      PipeConfig.getInstance().getPipeRealTimeQueuePollHistoryThreshold();
+  private static final int POLL_HISTORICAL_TSFILE_THRESHOLD =
+      
Math.max(PipeConfig.getInstance().getPipeRealTimeQueuePollHistoricalTsFileThreshold(),
 1);
+  private final AtomicLong pollHistoricalTsFileCounter = new AtomicLong(0);
 
   public PipeRealtimePriorityBlockingQueue() {
     super(new PipeDataRegionEventCounter());
@@ -81,18 +85,24 @@ public class PipeRealtimePriorityBlockingQueue extends 
UnboundedBlockingPendingQ
   @Override
   public Event directPoll() {
     Event event = null;
-    if (eventCount.get() >= pollHistoryThreshold) {
-      event = tsfileInsertEventDeque.pollFirst();
-      eventCount.set(0);
+    if (pollTsFileCounter.get() >= POLL_TSFILE_THRESHOLD) {
+      event =
+          pollHistoricalTsFileCounter.incrementAndGet() % 
POLL_HISTORICAL_TSFILE_THRESHOLD == 0
+              ? tsfileInsertEventDeque.pollFirst()
+              : tsfileInsertEventDeque.pollLast();
+      pollTsFileCounter.set(0);
     }
     if (Objects.isNull(event)) {
       // Sequentially poll the first offered non-TsFileInsertionEvent
       event = super.directPoll();
       if (Objects.isNull(event)) {
-        event = tsfileInsertEventDeque.pollFirst();
+        event =
+            pollHistoricalTsFileCounter.incrementAndGet() % 
POLL_HISTORICAL_TSFILE_THRESHOLD == 0
+                ? tsfileInsertEventDeque.pollFirst()
+                : tsfileInsertEventDeque.pollLast();
       }
       if (event != null) {
-        eventCount.incrementAndGet();
+        pollTsFileCounter.incrementAndGet();
       }
     }
 
@@ -113,18 +123,24 @@ public class PipeRealtimePriorityBlockingQueue extends 
UnboundedBlockingPendingQ
   @Override
   public Event waitedPoll() {
     Event event = null;
-    if (eventCount.get() >= pollHistoryThreshold) {
-      event = tsfileInsertEventDeque.pollFirst();
-      eventCount.set(0);
+    if (pollTsFileCounter.get() >= POLL_TSFILE_THRESHOLD) {
+      event =
+          pollHistoricalTsFileCounter.incrementAndGet() % 
POLL_HISTORICAL_TSFILE_THRESHOLD == 0
+              ? tsfileInsertEventDeque.pollFirst()
+              : tsfileInsertEventDeque.pollLast();
+      pollTsFileCounter.set(0);
     }
     if (event == null) {
       // Sequentially poll the first offered non-TsFileInsertionEvent
       event = super.directPoll();
       if (event == null && !tsfileInsertEventDeque.isEmpty()) {
-        event = tsfileInsertEventDeque.pollFirst();
+        event =
+            pollHistoricalTsFileCounter.incrementAndGet() % 
POLL_HISTORICAL_TSFILE_THRESHOLD == 0
+                ? tsfileInsertEventDeque.pollFirst()
+                : tsfileInsertEventDeque.pollLast();
       }
       if (event != null) {
-        eventCount.incrementAndGet();
+        pollTsFileCounter.incrementAndGet();
       }
     }
 
@@ -132,10 +148,13 @@ public class PipeRealtimePriorityBlockingQueue extends 
UnboundedBlockingPendingQ
     if (Objects.isNull(event)) {
       event = super.waitedPoll();
       if (Objects.isNull(event)) {
-        event = tsfileInsertEventDeque.pollFirst();
+        event =
+            pollHistoricalTsFileCounter.incrementAndGet() % 
POLL_HISTORICAL_TSFILE_THRESHOLD == 0
+                ? tsfileInsertEventDeque.pollFirst()
+                : tsfileInsertEventDeque.pollLast();
       }
       if (event != null) {
-        eventCount.incrementAndGet();
+        pollTsFileCounter.incrementAndGet();
       }
     }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
index d98b93d9526..bf0b8df2d6c 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
@@ -21,6 +21,7 @@ package 
org.apache.iotdb.db.pipe.connector.protocol.thrift.async;
 
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import 
org.apache.iotdb.commons.client.async.AsyncPipeDataTransferServiceClient;
+import org.apache.iotdb.commons.pipe.config.PipeConfig;
 import org.apache.iotdb.commons.pipe.connector.protocol.IoTDBConnector;
 import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
 import 
org.apache.iotdb.db.pipe.agent.task.subtask.connector.PipeConnectorSubtask;
@@ -92,6 +93,8 @@ public class IoTDBDataRegionAsyncConnector extends 
IoTDBConnector {
 
   private final IoTDBDataRegionSyncConnector retryConnector = new 
IoTDBDataRegionSyncConnector();
   private final BlockingQueue<Event> retryEventQueue = new 
LinkedBlockingQueue<>();
+  private final long maxRetryExecutionTimeMsPerCall =
+      
PipeConfig.getInstance().getPipeAsyncConnectorMaxRetryExecutionTimeMsPerCall();
 
   private IoTDBDataNodeAsyncClientManager clientManager;
 
@@ -416,11 +419,21 @@ public class IoTDBDataRegionAsyncConnector extends 
IoTDBConnector {
    * @see PipeConnector#transfer(TsFileInsertionEvent) for more details.
    */
   private void transferQueuedEventsIfNecessary() throws Exception {
+    if (retryEventQueue.isEmpty()) {
+      // Trigger cron heartbeat event in retry connector to send batch in time
+      retryConnector.transfer(PipeConnectorSubtask.CRON_HEARTBEAT_EVENT);
+      return;
+    }
+
+    final long retryStartTime = System.currentTimeMillis();
     while (!retryEventQueue.isEmpty()) {
       synchronized (this) {
-        if (isClosed.get() || retryEventQueue.isEmpty()) {
+        if (isClosed.get()) {
           return;
         }
+        if (retryEventQueue.isEmpty()) {
+          break;
+        }
 
         final Event peekedEvent = retryEventQueue.peek();
 
@@ -453,6 +466,11 @@ public class IoTDBDataRegionAsyncConnector extends 
IoTDBConnector {
           LOGGER.debug("Polled event {} from retry queue.", polledEvent);
         }
       }
+
+      // Stop retrying if the execution time exceeds the threshold for better 
realtime performance
+      if (System.currentTimeMillis() - retryStartTime > 
maxRetryExecutionTimeMsPerCall) {
+        break;
+      }
     }
 
     // Trigger cron heartbeat event in retry connector to send batch in time
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionHybridExtractor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionHybridExtractor.java
index 26cc157e7ee..b1baca7c42a 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionHybridExtractor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionHybridExtractor.java
@@ -48,6 +48,9 @@ public class PipeRealtimeDataRegionHybridExtractor extends 
PipeRealtimeDataRegio
   private static final Logger LOGGER =
       LoggerFactory.getLogger(PipeRealtimeDataRegionHybridExtractor.class);
 
+  private final boolean isPipeEpochKeepTsFileAfterStuckRestartEnabled =
+      PipeConfig.getInstance().isPipeEpochKeepTsFileAfterStuckRestartEnabled();
+
   @Override
   protected void doExtract(final PipeRealtimeEvent event) {
     final Event eventToExtract = event.getEvent();
@@ -223,6 +226,10 @@ public class PipeRealtimeDataRegionHybridExtractor extends 
PipeRealtimeDataRegio
   }
 
   private boolean isPipeTaskCurrentlyRestarted(final PipeRealtimeEvent event) {
+    if (!isPipeEpochKeepTsFileAfterStuckRestartEnabled) {
+      return false;
+    }
+
     final boolean isPipeTaskCurrentlyRestarted =
         PipeDataNodeAgent.task().isPipeTaskCurrentlyRestarted(pipeName);
     if (isPipeTaskCurrentlyRestarted && event.mayExtractorUseTablets(this)) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALInsertNodeCache.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALInsertNodeCache.java
index 76857c04983..0541ff96188 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALInsertNodeCache.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALInsertNodeCache.java
@@ -19,6 +19,7 @@
 
 package org.apache.iotdb.db.storageengine.dataregion.wal.utils;
 
+import org.apache.iotdb.commons.pipe.config.PipeConfig;
 import org.apache.iotdb.commons.utils.TestOnly;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
@@ -58,6 +59,7 @@ public class WALInsertNodeCache {
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(WALInsertNodeCache.class);
   private static final IoTDBConfig CONFIG = 
IoTDBDescriptor.getInstance().getConfig();
+  private static final PipeConfig PIPE_CONFIG = PipeConfig.getInstance();
 
   private final PipeMemoryBlock allocatedMemoryBlock;
   // Used to adjust the memory usage of the cache
@@ -75,8 +77,9 @@ public class WALInsertNodeCache {
     final long requestedAllocateSize =
         (long)
             Math.min(
-                (double) 2 * CONFIG.getWalFileSizeThresholdInByte(),
-                CONFIG.getAllocateMemoryForPipe() * 0.8 / 5);
+                (double) PIPE_CONFIG.getPipeMaxAllowedPinnedMemTableCount()
+                    * CONFIG.getWalFileSizeThresholdInByte(),
+                CONFIG.getAllocateMemoryForPipe() * 0.45);
     allocatedMemoryBlock =
         PipeDataNodeResourceManager.memory()
             .tryAllocate(requestedAllocateSize)
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
index cb69d5db391..d37e1c1eda9 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
@@ -204,7 +204,8 @@ public class CommonConfig {
 
   private boolean pipeFileReceiverFsyncEnabled = true;
 
-  private int pipeRealTimeQueuePollHistoryThreshold = 1;
+  private int pipeRealTimeQueuePollTsFileThreshold = 10;
+  private int pipeRealTimeQueuePollHistoricalTsFileThreshold = 3;
 
   /** The maximum number of threads that can be used to execute subtasks in 
PipeSubtaskExecutor. */
   private int pipeSubtaskExecutorMaxThreadNum =
@@ -234,6 +235,7 @@ public class CommonConfig {
   private long pipeConnectorRetryIntervalMs = 1000L;
   private boolean pipeConnectorRPCThriftCompressionEnabled = false;
 
+  private long pipeAsyncConnectorMaxRetryExecutionTimeMsPerCall = 500;
   private int pipeAsyncConnectorSelectorNumber =
       Math.max(4, Runtime.getRuntime().availableProcessors() / 2);
   private int pipeAsyncConnectorMaxClientNumber =
@@ -258,12 +260,13 @@ public class CommonConfig {
   private long pipeReceiverLoginPeriodicVerificationIntervalMs = 300000;
 
   private int pipeMaxAllowedHistoricalTsFilePerDataRegion = 100;
-  private int pipeMaxAllowedPendingTsFileEpochPerDataRegion = 10;
+  private int pipeMaxAllowedPendingTsFileEpochPerDataRegion = 5;
   private int pipeMaxAllowedPinnedMemTableCount = 10; // per data region
   private long pipeMaxAllowedLinkedTsFileCount = 300;
   private float pipeMaxAllowedLinkedDeletedTsFileDiskUsagePercentage = 0.1F;
   private long pipeStuckRestartIntervalSeconds = 120;
   private long pipeStuckRestartMinIntervalMs = 5 * 60 * 1000L; // 5 minutes
+  private boolean pipeEpochKeepTsFileAfterStuckRestartEnabled = false;
   private long pipeStorageEngineFlushTimeIntervalMs = Long.MAX_VALUE;
 
   private int pipeMetaReportMaxLogNumPerRound = 10;
@@ -857,6 +860,16 @@ public class CommonConfig {
     return pipeConnectorRPCThriftCompressionEnabled;
   }
 
+  public void setPipeAsyncConnectorMaxRetryExecutionTimeMsPerCall(
+      long pipeAsyncConnectorMaxRetryExecutionTimeMsPerCall) {
+    this.pipeAsyncConnectorMaxRetryExecutionTimeMsPerCall =
+        pipeAsyncConnectorMaxRetryExecutionTimeMsPerCall;
+  }
+
+  public long getPipeAsyncConnectorMaxRetryExecutionTimeMsPerCall() {
+    return pipeAsyncConnectorMaxRetryExecutionTimeMsPerCall;
+  }
+
   public int getPipeAsyncConnectorSelectorNumber() {
     return pipeAsyncConnectorSelectorNumber;
   }
@@ -990,12 +1003,22 @@ public class CommonConfig {
     this.pipeSubtaskExecutorForcedRestartIntervalMs = 
pipeSubtaskExecutorForcedRestartIntervalMs;
   }
 
-  public int getPipeRealTimeQueuePollHistoryThreshold() {
-    return pipeRealTimeQueuePollHistoryThreshold;
+  public int getPipeRealTimeQueuePollTsFileThreshold() {
+    return pipeRealTimeQueuePollTsFileThreshold;
+  }
+
+  public void setPipeRealTimeQueuePollTsFileThreshold(int 
pipeRealTimeQueuePollTsFileThreshold) {
+    this.pipeRealTimeQueuePollTsFileThreshold = 
pipeRealTimeQueuePollTsFileThreshold;
   }
 
-  public void setPipeRealTimeQueuePollHistoryThreshold(int 
pipeRealTimeQueuePollHistoryThreshold) {
-    this.pipeRealTimeQueuePollHistoryThreshold = 
pipeRealTimeQueuePollHistoryThreshold;
+  public int getPipeRealTimeQueuePollHistoricalTsFileThreshold() {
+    return pipeRealTimeQueuePollHistoricalTsFileThreshold;
+  }
+
+  public void setPipeRealTimeQueuePollHistoricalTsFileThreshold(
+      int pipeRealTimeQueuePollHistoricalTsFileThreshold) {
+    this.pipeRealTimeQueuePollHistoricalTsFileThreshold =
+        pipeRealTimeQueuePollHistoricalTsFileThreshold;
   }
 
   public void setPipeAirGapReceiverEnabled(boolean pipeAirGapReceiverEnabled) {
@@ -1077,6 +1100,10 @@ public class CommonConfig {
     return pipeStuckRestartMinIntervalMs;
   }
 
+  public boolean isPipeEpochKeepTsFileAfterStuckRestartEnabled() {
+    return pipeEpochKeepTsFileAfterStuckRestartEnabled;
+  }
+
   public long getPipeStorageEngineFlushTimeIntervalMs() {
     return pipeStorageEngineFlushTimeIntervalMs;
   }
@@ -1089,6 +1116,11 @@ public class CommonConfig {
     this.pipeStuckRestartMinIntervalMs = pipeStuckRestartMinIntervalMs;
   }
 
+  public void setPipeEpochKeepTsFileAfterStuckRestartEnabled(
+      boolean pipeEpochKeepTsFileAfterStuckRestartEnabled) {
+    this.pipeEpochKeepTsFileAfterStuckRestartEnabled = 
pipeEpochKeepTsFileAfterStuckRestartEnabled;
+  }
+
   public void setPipeStorageEngineFlushTimeIntervalMs(long 
pipeStorageEngineFlushTimeIntervalMs) {
     this.pipeStorageEngineFlushTimeIntervalMs = 
pipeStorageEngineFlushTimeIntervalMs;
   }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
index 2afe9bbdeb4..b3613bb6677 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
@@ -300,11 +300,19 @@ public class CommonDescriptor {
                 String.valueOf(
                     
config.getPipeDataStructureTsFileMemoryBlockAllocationRejectThreshold()))));
 
-    config.setPipeRealTimeQueuePollHistoryThreshold(
+    config.setPipeRealTimeQueuePollTsFileThreshold(
+        Integer.parseInt(
+            Optional.ofNullable(
+                    
properties.getProperty("pipe_realtime_queue_poll_history_threshold"))
+                .orElse(
+                    properties.getProperty(
+                        "pipe_realtime_queue_poll_tsfile_threshold",
+                        
String.valueOf(config.getPipeRealTimeQueuePollTsFileThreshold())))));
+    config.setPipeRealTimeQueuePollHistoricalTsFileThreshold(
         Integer.parseInt(
             properties.getProperty(
-                "pipe_realtime_queue_poll_history_threshold",
-                
Integer.toString(config.getPipeRealTimeQueuePollHistoryThreshold()))));
+                "pipe_realtime_queue_poll_historical_tsfile_threshold",
+                
String.valueOf(config.getPipeRealTimeQueuePollHistoricalTsFileThreshold()))));
 
     int pipeSubtaskExecutorMaxThreadNum =
         Integer.parseInt(
@@ -413,6 +421,15 @@ public class CommonDescriptor {
                     properties.getProperty(
                         "pipe_connector_rpc_thrift_compression_enabled",
                         
String.valueOf(config.isPipeConnectorRPCThriftCompressionEnabled())))));
+    config.setPipeAsyncConnectorMaxRetryExecutionTimeMsPerCall(
+        Long.parseLong(
+            Optional.ofNullable(
+                    
properties.getProperty("pipe_async_sink_max_retry_execution_time_ms_per_call"))
+                .orElse(
+                    properties.getProperty(
+                        
"pipe_async_connector_max_retry_execution_time_ms_per_call",
+                        String.valueOf(
+                            
config.getPipeAsyncConnectorMaxRetryExecutionTimeMsPerCall())))));
     int pipeAsyncConnectorSelectorNumber =
         Integer.parseInt(
             
Optional.ofNullable(properties.getProperty("pipe_sink_selector_number"))
@@ -532,6 +549,11 @@ public class CommonDescriptor {
             properties.getProperty(
                 "pipe_stuck_restart_min_interval_ms",
                 String.valueOf(config.getPipeStuckRestartMinIntervalMs()))));
+    config.setPipeEpochKeepTsFileAfterStuckRestartEnabled(
+        Boolean.parseBoolean(
+            properties.getProperty(
+                "pipe_epoch_keep_tsfile_after_stuck_restart_enabled",
+                
String.valueOf(config.isPipeEpochKeepTsFileAfterStuckRestartEnabled()))));
     config.setPipeStorageEngineFlushTimeIntervalMs(
         Long.parseLong(
             properties.getProperty(
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java
index addd9d1f0bf..41cf6df907d 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java
@@ -576,7 +576,7 @@ public abstract class PipeTaskAgent {
     return true;
   }
 
-  private void startPipe(final String pipeName, final long creationTime) {
+  protected void startPipe(final String pipeName, final long creationTime) {
     final PipeMeta existedPipeMeta = pipeMetaKeeper.getPipeMeta(pipeName);
 
     if (!checkBeforeStartPipe(existedPipeMeta, pipeName, creationTime)) {
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
index 4492df7a5db..0f558782f00 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
@@ -78,8 +78,12 @@ public class PipeConfig {
 
   /////////////////////////////// Subtask Connector 
///////////////////////////////
 
-  public int getPipeRealTimeQueuePollHistoryThreshold() {
-    return COMMON_CONFIG.getPipeRealTimeQueuePollHistoryThreshold();
+  public int getPipeRealTimeQueuePollTsFileThreshold() {
+    return COMMON_CONFIG.getPipeRealTimeQueuePollTsFileThreshold();
+  }
+
+  public int getPipeRealTimeQueuePollHistoricalTsFileThreshold() {
+    return COMMON_CONFIG.getPipeRealTimeQueuePollHistoricalTsFileThreshold();
   }
 
   /////////////////////////////// Subtask Executor 
///////////////////////////////
@@ -148,6 +152,10 @@ public class PipeConfig {
     return COMMON_CONFIG.isPipeConnectorRPCThriftCompressionEnabled();
   }
 
+  public long getPipeAsyncConnectorMaxRetryExecutionTimeMsPerCall() {
+    return COMMON_CONFIG.getPipeAsyncConnectorMaxRetryExecutionTimeMsPerCall();
+  }
+
   public int getPipeAsyncConnectorSelectorNumber() {
     return COMMON_CONFIG.getPipeAsyncConnectorSelectorNumber();
   }
@@ -264,6 +272,10 @@ public class PipeConfig {
     return COMMON_CONFIG.getPipeStuckRestartMinIntervalMs();
   }
 
+  public boolean isPipeEpochKeepTsFileAfterStuckRestartEnabled() {
+    return COMMON_CONFIG.isPipeEpochKeepTsFileAfterStuckRestartEnabled();
+  }
+
   public long getPipeStorageEngineFlushTimeIntervalMs() {
     return COMMON_CONFIG.getPipeStorageEngineFlushTimeIntervalMs();
   }
@@ -373,7 +385,10 @@ public class PipeConfig {
         getPipeDataStructureTsFileMemoryBlockAllocationRejectThreshold());
 
     LOGGER.info(
-        "PipeRealTimeQueuePollHistoryThreshold: {}", 
getPipeRealTimeQueuePollHistoryThreshold());
+        "PipeRealTimeQueuePollTsFileThreshold: {}", 
getPipeRealTimeQueuePollTsFileThreshold());
+    LOGGER.info(
+        "PipeRealTimeQueuePollHistoricalTsFileThreshold: {}",
+        getPipeRealTimeQueuePollHistoricalTsFileThreshold());
 
     LOGGER.info("PipeSubtaskExecutorMaxThreadNum: {}", 
getPipeSubtaskExecutorMaxThreadNum());
     LOGGER.info(
@@ -423,6 +438,9 @@ public class PipeConfig {
         "PipeRemainingTimeCommitRateAverageTime: {}", 
getPipeRemainingTimeCommitRateAverageTime());
     LOGGER.info("PipeTsFileScanParsingThreshold(): {}", 
getPipeTsFileScanParsingThreshold());
 
+    LOGGER.info(
+        "PipeAsyncConnectorMaxRetryExecutionTimeMsPerCall: {}",
+        getPipeAsyncConnectorMaxRetryExecutionTimeMsPerCall());
     LOGGER.info("PipeAsyncConnectorSelectorNumber: {}", 
getPipeAsyncConnectorSelectorNumber());
     LOGGER.info("PipeAsyncConnectorMaxClientNumber: {}", 
getPipeAsyncConnectorMaxClientNumber());
 
@@ -468,6 +486,9 @@ public class PipeConfig {
         getPipeMaxAllowedLinkedDeletedTsFileDiskUsagePercentage());
     LOGGER.info("PipeStuckRestartIntervalSeconds: {}", 
getPipeStuckRestartIntervalSeconds());
     LOGGER.info("PipeStuckRestartMinIntervalMs: {}", 
getPipeStuckRestartMinIntervalMs());
+    LOGGER.info(
+        "PipeEpochKeepTsFileAfterStuckRestartEnabled: {}",
+        isPipeEpochKeepTsFileAfterStuckRestartEnabled());
     LOGGER.info(
         "PipeStorageEngineFlushTimeIntervalMs: {}", 
getPipeStorageEngineFlushTimeIntervalMs());
 

Reply via email to