This is an automated email from the ASF dual-hosted git repository.

justinchen pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/dev/1.3 by this push:
     new 88ec7cf0e88 [To dev/1.3] Pipe: Fix TSFile transfer blocking InsertNode 
sending (#15666) (#15667)
88ec7cf0e88 is described below

commit 88ec7cf0e88ac477bb50a956387295264cb174fb
Author: Caideyipi <[email protected]>
AuthorDate: Sat Jun 7 14:11:23 2025 +0800

    [To dev/1.3] Pipe: Fix TSFile transfer blocking InsertNode sending (#15666) 
(#15667)
    
    * Pipe: Fix TSFile transfer blocking InsertNode sending
    
    * fix
    
    * fix
    
    * fix
    
    * fix
    
    * fix
    
    * fix
    
    * fix
    
    Co-authored-by: Zhenyu Luo <[email protected]>
---
 .../connector/PipeConnectorSubtaskManager.java     | 10 +++
 .../PipeRealtimePriorityBlockingQueue.java         | 22 +++++--
 .../client/IoTDBDataNodeAsyncClientManager.java    |  8 ++-
 .../async/IoTDBDataRegionAsyncConnector.java       | 76 +++++++++++++++++++---
 .../apache/iotdb/commons/conf/CommonConfig.java    | 30 ++++++++-
 .../iotdb/commons/pipe/config/PipeConfig.java      |  9 +++
 .../iotdb/commons/pipe/config/PipeDescriptor.java  | 10 +++
 7 files changed, 148 insertions(+), 17 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/connector/PipeConnectorSubtaskManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/connector/PipeConnectorSubtaskManager.java
index 10dc41a9323..e556da428ef 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/connector/PipeConnectorSubtaskManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/connector/PipeConnectorSubtaskManager.java
@@ -29,6 +29,7 @@ import 
org.apache.iotdb.commons.pipe.config.plugin.configuraion.PipeTaskRuntimeC
 import 
org.apache.iotdb.commons.pipe.config.plugin.env.PipeTaskConnectorRuntimeEnvironment;
 import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent;
 import 
org.apache.iotdb.db.pipe.agent.task.execution.PipeConnectorSubtaskExecutor;
+import 
org.apache.iotdb.db.pipe.connector.protocol.thrift.async.IoTDBDataRegionAsyncConnector;
 import org.apache.iotdb.db.pipe.metric.source.PipeDataRegionEventCounter;
 import org.apache.iotdb.db.storageengine.StorageEngine;
 import org.apache.iotdb.pipe.api.PipeConnector;
@@ -46,6 +47,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
+import java.util.concurrent.atomic.AtomicInteger;
 
 public class PipeConnectorSubtaskManager {
 
@@ -110,12 +112,17 @@ public class PipeConnectorSubtaskManager {
       final List<PipeConnectorSubtaskLifeCycle> 
pipeConnectorSubtaskLifeCycleList =
           new ArrayList<>(connectorNum);
 
+      AtomicInteger counter = new AtomicInteger(0);
       // Shared pending queue for all subtasks
       final UnboundedBlockingPendingQueue<Event> pendingQueue =
           realTimeFirst
               ? new PipeRealtimePriorityBlockingQueue()
               : new UnboundedBlockingPendingQueue<>(new 
PipeDataRegionEventCounter());
 
+      if (realTimeFirst) {
+        ((PipeRealtimePriorityBlockingQueue) 
pendingQueue).setOfferTsFileCounter(counter);
+      }
+
       for (int connectorIndex = 0; connectorIndex < connectorNum; 
connectorIndex++) {
         final PipeConnector pipeConnector =
             isDataRegionConnector
@@ -126,6 +133,9 @@ public class PipeConnectorSubtaskManager {
         // 1. Construct, validate and customize PipeConnector, and then 
handshake (create
         // connection) with the target
         try {
+          if (pipeConnector instanceof IoTDBDataRegionAsyncConnector) {
+            ((IoTDBDataRegionAsyncConnector) 
pipeConnector).setTransferTsFileCounter(counter);
+          }
           pipeConnector.validate(new 
PipeParameterValidator(pipeConnectorParameters));
           pipeConnector.customize(
               pipeConnectorParameters, new 
PipeTaskRuntimeConfiguration(environment));
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/connector/PipeRealtimePriorityBlockingQueue.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/connector/PipeRealtimePriorityBlockingQueue.java
index c8b050a5cfe..a4f05447eae 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/connector/PipeRealtimePriorityBlockingQueue.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/connector/PipeRealtimePriorityBlockingQueue.java
@@ -47,6 +47,9 @@ public class PipeRealtimePriorityBlockingQueue extends 
UnboundedBlockingPendingQ
 
   private final AtomicLong pollHistoricalTsFileCounter = new AtomicLong(0);
 
+  // Need to ensure that NPE does not occur
+  private AtomicInteger offerTsFileCounter = new AtomicInteger(0);
+
   public PipeRealtimePriorityBlockingQueue() {
     super(new PipeDataRegionEventCounter());
   }
@@ -85,18 +88,22 @@ public class PipeRealtimePriorityBlockingQueue extends 
UnboundedBlockingPendingQ
     Event event = null;
     final int pollHistoricalTsFileThreshold =
         PIPE_CONFIG.getPipeRealTimeQueuePollHistoricalTsFileThreshold();
+    final int realTimeQueueMaxWaitingTsFileSize =
+        PIPE_CONFIG.getPipeRealTimeQueueMaxWaitingTsFileSize();
 
-    if (pollTsFileCounter.get() >= 
PIPE_CONFIG.getPipeRealTimeQueuePollTsFileThreshold()) {
+    if (pollTsFileCounter.get() >= 
PIPE_CONFIG.getPipeRealTimeQueuePollTsFileThreshold()
+        && offerTsFileCounter.get() < realTimeQueueMaxWaitingTsFileSize) {
       event =
           pollHistoricalTsFileCounter.incrementAndGet() % 
pollHistoricalTsFileThreshold == 0
               ? tsfileInsertEventDeque.pollFirst()
               : tsfileInsertEventDeque.pollLast();
       pollTsFileCounter.set(0);
     }
+
     if (Objects.isNull(event)) {
       // Sequentially poll the first offered non-TsFileInsertionEvent
       event = super.directPoll();
-      if (Objects.isNull(event)) {
+      if (Objects.isNull(event) && offerTsFileCounter.get() < 
realTimeQueueMaxWaitingTsFileSize) {
         event =
             pollHistoricalTsFileCounter.incrementAndGet() % 
pollHistoricalTsFileThreshold == 0
                 ? tsfileInsertEventDeque.pollFirst()
@@ -126,8 +133,11 @@ public class PipeRealtimePriorityBlockingQueue extends 
UnboundedBlockingPendingQ
     Event event = null;
     final int pollHistoricalTsFileThreshold =
         PIPE_CONFIG.getPipeRealTimeQueuePollHistoricalTsFileThreshold();
+    final int realTimeQueueMaxWaitingTsFileSize =
+        PIPE_CONFIG.getPipeRealTimeQueueMaxWaitingTsFileSize();
 
-    if (pollTsFileCounter.get() >= 
PIPE_CONFIG.getPipeRealTimeQueuePollTsFileThreshold()) {
+    if (pollTsFileCounter.get() >= 
PIPE_CONFIG.getPipeRealTimeQueuePollTsFileThreshold()
+        && offerTsFileCounter.get() < realTimeQueueMaxWaitingTsFileSize) {
       event =
           pollHistoricalTsFileCounter.incrementAndGet() % 
pollHistoricalTsFileThreshold == 0
               ? tsfileInsertEventDeque.pollFirst()
@@ -149,7 +159,7 @@ public class PipeRealtimePriorityBlockingQueue extends 
UnboundedBlockingPendingQ
     }
 
     // If no event is available, block until an event is available
-    if (Objects.isNull(event)) {
+    if (Objects.isNull(event) && offerTsFileCounter.get() < 
realTimeQueueMaxWaitingTsFileSize) {
       event = super.waitedPoll();
       if (Objects.isNull(event)) {
         event =
@@ -233,4 +243,8 @@ public class PipeRealtimePriorityBlockingQueue extends 
UnboundedBlockingPendingQ
   public int getTsFileInsertionEventCount() {
     return tsfileInsertEventDeque.size();
   }
+
+  public void setOfferTsFileCounter(AtomicInteger offerTsFileCounter) {
+    this.offerTsFileCounter = offerTsFileCounter;
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeAsyncClientManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeAsyncClientManager.java
index ed42dfe10ef..551b0cb2dca 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeAsyncClientManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeAsyncClientManager.java
@@ -87,7 +87,8 @@ public class IoTDBDataNodeAsyncClientManager extends 
IoTDBClientManager
       final boolean shouldReceiverConvertOnTypeMismatch,
       final String loadTsFileStrategy,
       final boolean validateTsFile,
-      final boolean shouldMarkAsPipeRequest) {
+      final boolean shouldMarkAsPipeRequest,
+      final boolean isTSFileUsed) {
     super(
         endPoints,
         username,
@@ -102,12 +103,13 @@ public class IoTDBDataNodeAsyncClientManager extends 
IoTDBClientManager
 
     receiverAttributes =
         String.format(
-            "%s-%s-%s-%s-%s",
+            "%s-%s-%s-%s-%s-%s",
             Base64.getEncoder().encodeToString((username + ":" + 
password).getBytes()),
             shouldReceiverConvertOnTypeMismatch,
             loadTsFileStrategy,
             validateTsFile,
-            shouldMarkAsPipeRequest);
+            shouldMarkAsPipeRequest,
+            isTSFileUsed);
     synchronized (IoTDBDataNodeAsyncClientManager.class) {
       if 
(!ASYNC_PIPE_DATA_TRANSFER_CLIENT_MANAGER_HOLDER.containsKey(receiverAttributes))
 {
         ASYNC_PIPE_DATA_TRANSFER_CLIENT_MANAGER_HOLDER.putIfAbsent(
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
index e4634dd4300..c50b651f3be 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
@@ -72,7 +72,10 @@ import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -93,12 +96,20 @@ public class IoTDBDataRegionAsyncConnector extends 
IoTDBConnector {
   private static final String THRIFT_ERROR_FORMATTER_WITH_ENDPOINT =
       "Exception occurred while sending to receiver %s:%s.";
 
+  private static final boolean isSplitTSFileBatchModeEnabled = true;
+  private static final ExecutorService executor =
+      
Executors.newFixedThreadPool(PipeConfig.getInstance().getPipeAsyncConnectorMaxClientNumber());
+
   private final IoTDBDataRegionSyncConnector syncConnector = new 
IoTDBDataRegionSyncConnector();
   private final BlockingQueue<Event> retryEventQueue = new 
LinkedBlockingQueue<>();
   private final PipeDataRegionEventCounter retryEventQueueEventCounter =
       new PipeDataRegionEventCounter();
 
   private IoTDBDataNodeAsyncClientManager clientManager;
+  private IoTDBDataNodeAsyncClientManager transferTsFileClientManager;
+
+  // It is necessary to ensure that other classes that inherit Async Connector 
will not have NPE
+  public AtomicInteger transferTsFileCounter = new AtomicInteger(0);
 
   private PipeTransferBatchReqBuilder tabletBatchBuilder;
 
@@ -141,7 +152,23 @@ public class IoTDBDataRegionAsyncConnector extends 
IoTDBConnector {
             shouldReceiverConvertOnTypeMismatch,
             loadTsFileStrategy,
             loadTsFileValidation,
-            shouldMarkAsPipeRequest);
+            shouldMarkAsPipeRequest,
+            false);
+
+    transferTsFileClientManager =
+        new IoTDBDataNodeAsyncClientManager(
+            nodeUrls,
+            parameters.getBooleanOrDefault(
+                Arrays.asList(SINK_LEADER_CACHE_ENABLE_KEY, 
CONNECTOR_LEADER_CACHE_ENABLE_KEY),
+                CONNECTOR_LEADER_CACHE_ENABLE_DEFAULT_VALUE),
+            loadBalanceStrategy,
+            username,
+            password,
+            shouldReceiverConvertOnTypeMismatch,
+            loadTsFileStrategy,
+            loadTsFileValidation,
+            shouldMarkAsPipeRequest,
+            isSplitTSFileBatchModeEnabled);
 
     if (isTabletBatchModeEnabled) {
       tabletBatchBuilder = new PipeTransferBatchReqBuilder(parameters);
@@ -373,14 +400,37 @@ public class IoTDBDataRegionAsyncConnector extends 
IoTDBConnector {
     }
   }
 
-  private void transfer(final PipeTransferTsFileHandler 
pipeTransferTsFileHandler) {
-    AsyncPipeDataTransferServiceClient client = null;
-    try {
-      client = clientManager.borrowClient();
-      pipeTransferTsFileHandler.transfer(clientManager, client);
-    } catch (final Exception ex) {
-      logOnClientException(client, ex);
-      pipeTransferTsFileHandler.onError(ex);
+  private void transfer(final PipeTransferTsFileHandler 
pipeTransferTsFileHandler)
+      throws Exception {
+    transferTsFileCounter.incrementAndGet();
+    CompletableFuture<Void> completableFuture =
+        CompletableFuture.supplyAsync(
+            () -> {
+              AsyncPipeDataTransferServiceClient client = null;
+              try {
+                client = transferTsFileClientManager.borrowClient();
+                pipeTransferTsFileHandler.transfer(clientManager, client);
+              } catch (final Exception ex) {
+                logOnClientException(client, ex);
+                pipeTransferTsFileHandler.onError(ex);
+              } finally {
+                transferTsFileCounter.decrementAndGet();
+              }
+              return null;
+            },
+            executor);
+
+    if (PipeConfig.getInstance().isTransferTsFileSync()) {
+      try {
+        completableFuture.get();
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+        LOGGER.error("Transfer tsfile event asynchronously was interrupted.", 
e);
+        throw new PipeException("Transfer tsfile event asynchronously was 
interrupted.", e);
+      } catch (Exception e) {
+        LOGGER.error("Failed to transfer tsfile event asynchronously.", e);
+        throw e;
+      }
     }
   }
 
@@ -665,6 +715,10 @@ public class IoTDBDataRegionAsyncConnector extends 
IoTDBConnector {
       if (clientManager != null) {
         clientManager.close();
       }
+
+      if (transferTsFileClientManager != null) {
+        transferTsFileClientManager.close();
+      }
     } catch (final Exception e) {
       LOGGER.warn("Failed to close client manager.", e);
     }
@@ -717,4 +771,8 @@ public class IoTDBDataRegionAsyncConnector extends 
IoTDBConnector {
   public boolean hasPendingHandlers() {
     return !pendingHandlers.isEmpty();
   }
+
+  public void setTransferTsFileCounter(AtomicInteger transferTsFileCounter) {
+    this.transferTsFileCounter = transferTsFileCounter;
+  }
 }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
index 94cd826b430..6f6d8723eb3 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
@@ -207,6 +207,7 @@ public class CommonConfig {
 
   private int pipeRealTimeQueuePollTsFileThreshold = 10;
   private int pipeRealTimeQueuePollHistoricalTsFileThreshold = 3;
+  private int pipeRealTimeQueueMaxWaitingTsFileSize = 1;
 
   /** The maximum number of threads that can be used to execute subtasks in 
PipeSubtaskExecutor. */
   private int pipeSubtaskExecutorMaxThreadNum =
@@ -235,7 +236,7 @@ public class CommonConfig {
 
   private int pipeConnectorHandshakeTimeoutMs = 10 * 1000; // 10 seconds
   private int pipeConnectorTransferTimeoutMs = 15 * 60 * 1000; // 15 minutes
-  private int pipeConnectorReadFileBufferSize = 8388608;
+  private int pipeConnectorReadFileBufferSize = 5242880; // 5MB
   private boolean isPipeConnectorReadFileBufferMemoryControlEnabled = false;
   private long pipeConnectorRetryIntervalMs = 1000L;
   private boolean pipeConnectorRPCThriftCompressionEnabled = false;
@@ -310,6 +311,7 @@ public class CommonConfig {
   private double pipeThresholdAllocationStrategyMaximumMemoryIncrementRatio = 
0.1d;
   private double pipeThresholdAllocationStrategyLowUsageThreshold = 0.2d;
   private double pipeThresholdAllocationStrategyFixedMemoryHighUsageThreshold 
= 0.8d;
+  private boolean pipeTransferTsFileSync = false;
 
   private long twoStageAggregateMaxCombinerLiveTimeInMs = 8 * 60 * 1000L; // 8 
minutes
   private long twoStageAggregateDataRegionInfoCacheTimeInMs = 3 * 60 * 1000L; 
// 3 minutes
@@ -1335,6 +1337,20 @@ public class CommonConfig {
         pipeRealTimeQueuePollHistoricalTsFileThreshold);
   }
 
+  public int getPipeRealTimeQueueMaxWaitingTsFileSize() {
+    return pipeRealTimeQueueMaxWaitingTsFileSize;
+  }
+
+  public void setPipeRealTimeQueueMaxWaitingTsFileSize(int 
pipeRealTimeQueueMaxWaitingTsFileSize) {
+    if (this.pipeRealTimeQueueMaxWaitingTsFileSize == 
pipeRealTimeQueueMaxWaitingTsFileSize) {
+      return;
+    }
+    this.pipeRealTimeQueueMaxWaitingTsFileSize = 
pipeRealTimeQueueMaxWaitingTsFileSize;
+    logger.info(
+        "pipeRealTimeQueueMaxWaitingTsFileSize is set to {}.",
+        pipeRealTimeQueueMaxWaitingTsFileSize);
+  }
+
   public void setPipeAirGapReceiverEnabled(boolean pipeAirGapReceiverEnabled) {
     if (pipeAirGapReceiverEnabled == this.pipeAirGapReceiverEnabled) {
       return;
@@ -1951,6 +1967,18 @@ public class CommonConfig {
         pipeThresholdAllocationStrategyFixedMemoryHighUsageThreshold);
   }
 
+  public boolean getPipeTransferTsFileSync() {
+    return pipeTransferTsFileSync;
+  }
+
+  public void setPipeTransferTsFileSync(boolean pipeTransferTsFileSync) {
+    if (this.pipeTransferTsFileSync == pipeTransferTsFileSync) {
+      return;
+    }
+    this.pipeTransferTsFileSync = pipeTransferTsFileSync;
+    logger.info("pipeTransferTsFileSync is set to {}", pipeTransferTsFileSync);
+  }
+
   public double getPipeAllSinksRateLimitBytesPerSecond() {
     return pipeAllSinksRateLimitBytesPerSecond;
   }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
index 011ea9c9d71..8c2b6dff2b0 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
@@ -101,6 +101,10 @@ public class PipeConfig {
     return 
Math.max(COMMON_CONFIG.getPipeRealTimeQueuePollHistoricalTsFileThreshold(), 1);
   }
 
+  public int getPipeRealTimeQueueMaxWaitingTsFileSize() {
+    return COMMON_CONFIG.getPipeRealTimeQueueMaxWaitingTsFileSize();
+  }
+
   /////////////////////////////// Subtask Executor 
///////////////////////////////
 
   public int getPipeSubtaskExecutorMaxThreadNum() {
@@ -255,6 +259,10 @@ public class PipeConfig {
     return 
COMMON_CONFIG.getPipeThresholdAllocationStrategyFixedMemoryHighUsageThreshold();
   }
 
+  public boolean isTransferTsFileSync() {
+    return COMMON_CONFIG.getPipeTransferTsFileSync();
+  }
+
   /////////////////////////////// Meta Consistency 
///////////////////////////////
 
   public boolean isSeperatedPipeHeartbeatEnabled() {
@@ -520,6 +528,7 @@ public class PipeConfig {
     LOGGER.info(
         "PipePipeRemainingInsertEventCountAverage: {}", 
getPipeRemainingInsertNodeCountAverage());
     LOGGER.info("PipeTsFileScanParsingThreshold(): {}", 
getPipeTsFileScanParsingThreshold());
+    LOGGER.info("PipeTransferTsFileSync: {}", isTransferTsFileSync());
 
     LOGGER.info("PipeDynamicMemoryHistoryWeight: {}", 
getPipeDynamicMemoryHistoryWeight());
     LOGGER.info(
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
index cc9850e90d3..6303c8ad571 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
@@ -264,6 +264,11 @@ public class PipeDescriptor {
             properties.getProperty(
                 "pipe_realtime_queue_poll_historical_tsfile_threshold",
                 
String.valueOf(config.getPipeRealTimeQueuePollHistoricalTsFileThreshold()))));
+    config.setPipeRealTimeQueueMaxWaitingTsFileSize(
+        Integer.parseInt(
+            properties.getProperty(
+                "pipe_realTime_queue_max_waiting_tsFile_size",
+                
String.valueOf(config.getPipeRealTimeQueueMaxWaitingTsFileSize()))));
     config.setPipeSubtaskExecutorBasicCheckPointIntervalByConsumedEventCount(
         Integer.parseInt(
             properties.getProperty(
@@ -526,6 +531,11 @@ public class PipeDescriptor {
                 "pipe_max_aligned_series_num_in_one_batch",
                 
String.valueOf(config.getPipeMaxAlignedSeriesNumInOneBatch()))));
 
+    config.setPipeTransferTsFileSync(
+        Boolean.parseBoolean(
+            properties.getProperty(
+                "pipe_transfer_tsfile_sync", 
String.valueOf(config.getPipeTransferTsFileSync()))));
+
     config.setPipeRemainingTimeCommitRateAutoSwitchSeconds(
         Long.parseLong(
             properties.getProperty(

Reply via email to