This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch optimize-pipe-after-stop
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/optimize-pipe-after-stop by 
this push:
     new 6ecf66fc722 op
6ecf66fc722 is described below

commit 6ecf66fc72229f87f9d6a6cd1952943b575d0cd8
Author: Steve Yurong Su <[email protected]>
AuthorDate: Mon Mar 10 13:25:28 2025 +0800

    op
---
 .../thrift/async/IoTDBDataRegionAsyncConnector.java    | 18 +++++++++++++++++-
 .../org/apache/iotdb/commons/conf/CommonConfig.java    | 11 +++++++++++
 .../apache/iotdb/commons/conf/CommonDescriptor.java    |  9 +++++++++
 .../apache/iotdb/commons/pipe/config/PipeConfig.java   |  7 +++++++
 4 files changed, 44 insertions(+), 1 deletion(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
index d98b93d9526..942e60df567 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
@@ -21,6 +21,7 @@ package 
org.apache.iotdb.db.pipe.connector.protocol.thrift.async;
 
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import 
org.apache.iotdb.commons.client.async.AsyncPipeDataTransferServiceClient;
+import org.apache.iotdb.commons.pipe.config.PipeConfig;
 import org.apache.iotdb.commons.pipe.connector.protocol.IoTDBConnector;
 import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
 import 
org.apache.iotdb.db.pipe.agent.task.subtask.connector.PipeConnectorSubtask;
@@ -92,6 +93,8 @@ public class IoTDBDataRegionAsyncConnector extends 
IoTDBConnector {
 
   private final IoTDBDataRegionSyncConnector retryConnector = new 
IoTDBDataRegionSyncConnector();
   private final BlockingQueue<Event> retryEventQueue = new 
LinkedBlockingQueue<>();
+  private final long maxRetryExecutionTimeMsPerCall =
+      
PipeConfig.getInstance().getPipeAsyncConnectorMaxRetryExecutionTimeMsPerCall();
 
   private IoTDBDataNodeAsyncClientManager clientManager;
 
@@ -416,11 +419,19 @@ public class IoTDBDataRegionAsyncConnector extends 
IoTDBConnector {
    * @see PipeConnector#transfer(TsFileInsertionEvent) for more details.
    */
   private void transferQueuedEventsIfNecessary() throws Exception {
+    if (retryEventQueue.isEmpty()) {
+      return;
+    }
+
+    final long currentTime = System.currentTimeMillis();
     while (!retryEventQueue.isEmpty()) {
       synchronized (this) {
-        if (isClosed.get() || retryEventQueue.isEmpty()) {
+        if (isClosed.get()) {
           return;
         }
+        if (retryEventQueue.isEmpty()) {
+          break;
+        }
 
         final Event peekedEvent = retryEventQueue.peek();
 
@@ -453,6 +464,11 @@ public class IoTDBDataRegionAsyncConnector extends 
IoTDBConnector {
           LOGGER.debug("Polled event {} from retry queue.", polledEvent);
         }
       }
+
+      // Stop retrying if the execution time exceeds the threshold for better 
realtime performance
+      if (System.currentTimeMillis() - currentTime > 
maxRetryExecutionTimeMsPerCall) {
+        break;
+      }
     }
 
     // Trigger cron heartbeat event in retry connector to send batch in time
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
index 340e7e26462..c432ea718ef 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
@@ -235,6 +235,7 @@ public class CommonConfig {
   private long pipeConnectorRetryIntervalMs = 1000L;
   private boolean pipeConnectorRPCThriftCompressionEnabled = false;
 
+  private long pipeAsyncConnectorMaxRetryExecutionTimeMsPerCall = 500;
   private int pipeAsyncConnectorSelectorNumber =
       Math.max(4, Runtime.getRuntime().availableProcessors() / 2);
   private int pipeAsyncConnectorMaxClientNumber =
@@ -859,6 +860,16 @@ public class CommonConfig {
     return pipeConnectorRPCThriftCompressionEnabled;
   }
 
+  public void setPipeAsyncConnectorMaxRetryExecutionTimeMsPerCall(
+      long pipeAsyncConnectorMaxRetryExecutionTimeMsPerCall) {
+    this.pipeAsyncConnectorMaxRetryExecutionTimeMsPerCall =
+        pipeAsyncConnectorMaxRetryExecutionTimeMsPerCall;
+  }
+
+  public long getPipeAsyncConnectorMaxRetryExecutionTimeMsPerCall() {
+    return pipeAsyncConnectorMaxRetryExecutionTimeMsPerCall;
+  }
+
   public int getPipeAsyncConnectorSelectorNumber() {
     return pipeAsyncConnectorSelectorNumber;
   }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
index 10b6978149c..b3613bb6677 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
@@ -421,6 +421,15 @@ public class CommonDescriptor {
                     properties.getProperty(
                         "pipe_connector_rpc_thrift_compression_enabled",
                         
String.valueOf(config.isPipeConnectorRPCThriftCompressionEnabled())))));
+    config.setPipeAsyncConnectorMaxRetryExecutionTimeMsPerCall(
+        Long.parseLong(
+            Optional.ofNullable(
+                    
properties.getProperty("pipe_async_sink_max_retry_execution_time_ms_per_call"))
+                .orElse(
+                    properties.getProperty(
+                        
"pipe_async_connector_max_retry_execution_time_ms_per_call",
+                        String.valueOf(
+                            
config.getPipeAsyncConnectorMaxRetryExecutionTimeMsPerCall())))));
     int pipeAsyncConnectorSelectorNumber =
         Integer.parseInt(
             
Optional.ofNullable(properties.getProperty("pipe_sink_selector_number"))
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
index c0cc1a4d9fb..0f558782f00 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
@@ -152,6 +152,10 @@ public class PipeConfig {
     return COMMON_CONFIG.isPipeConnectorRPCThriftCompressionEnabled();
   }
 
+  public long getPipeAsyncConnectorMaxRetryExecutionTimeMsPerCall() {
+    return COMMON_CONFIG.getPipeAsyncConnectorMaxRetryExecutionTimeMsPerCall();
+  }
+
   public int getPipeAsyncConnectorSelectorNumber() {
     return COMMON_CONFIG.getPipeAsyncConnectorSelectorNumber();
   }
@@ -434,6 +438,9 @@ public class PipeConfig {
         "PipeRemainingTimeCommitRateAverageTime: {}", 
getPipeRemainingTimeCommitRateAverageTime());
     LOGGER.info("PipeTsFileScanParsingThreshold(): {}", 
getPipeTsFileScanParsingThreshold());
 
+    LOGGER.info(
+        "PipeAsyncConnectorMaxRetryExecutionTimeMsPerCall: {}",
+        getPipeAsyncConnectorMaxRetryExecutionTimeMsPerCall());
     LOGGER.info("PipeAsyncConnectorSelectorNumber: {}", 
getPipeAsyncConnectorSelectorNumber());
     LOGGER.info("PipeAsyncConnectorMaxClientNumber: {}", 
getPipeAsyncConnectorMaxClientNumber());
 

Reply via email to