This is an automated email from the ASF dual-hosted git repository.

justinchen pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/dev/1.3 by this push:
     new 403cf64f911 [To dev/1.3] Pipe: Modify the Pipe configuration item log 
name to be consistent with the Properties name. (#16732) (#16759)
403cf64f911 is described below

commit 403cf64f9119d9f5c13dfece52596df43ab448aa
Author: Zhenyu Luo <[email protected]>
AuthorDate: Mon Nov 17 23:00:43 2025 +0800

    [To dev/1.3] Pipe: Modify the Pipe configuration item log name to be 
consistent with the Properties name. (#16732) (#16759)
    
    * Pipe: Modify the Pipe configuration item log name to be consistent with 
the Properties name. (#16732)
    
    * Pipe: Modify the Pipe configuration item log name to be consistent with 
the Properties name.
    
    * update
    
    (cherry picked from commit a6191d916b2ceca7f3b22e728ae4117b99290261)
    
    * update
    
    * update
---
 .../iotdb/tool/tsfile/ImportTsFileRemotely.java    |  3 +-
 .../exchange/sender/TwoStageAggregateSender.java   |  2 +-
 .../client/IoTDBDataNodeAsyncClientManager.java    |  4 +-
 .../thrift/async/IoTDBDataRegionAsyncSink.java     | 18 ++---
 .../apache/iotdb/commons/conf/CommonConfig.java    | 84 +++++++++++-----------
 .../iotdb/commons/pipe/config/PipeConfig.java      | 31 ++++----
 .../iotdb/commons/pipe/config/PipeDescriptor.java  | 20 +++---
 .../pipe/sink/client/IoTDBSyncClientManager.java   |  2 +-
 8 files changed, 75 insertions(+), 89 deletions(-)

diff --git 
a/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/tsfile/ImportTsFileRemotely.java
 
b/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/tsfile/ImportTsFileRemotely.java
index d842b9803af..855c1678a84 100644
--- 
a/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/tsfile/ImportTsFileRemotely.java
+++ 
b/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/tsfile/ImportTsFileRemotely.java
@@ -295,8 +295,7 @@ public class ImportTsFileRemotely extends ImportTsFileBase {
       this.client =
           new IoTDBSyncClient(
               new ThriftClientProperty.Builder()
-                  .setConnectionTimeoutMs(
-                      
PipeConfig.getInstance().getPipeConnectorHandshakeTimeoutMs())
+                  
.setConnectionTimeoutMs(PipeConfig.getInstance().getPipeSinkHandshakeTimeoutMs())
                   .setRpcThriftCompressionEnabled(
                       
PipeConfig.getInstance().isPipeConnectorRPCThriftCompressionEnabled())
                   .build(),
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/exchange/sender/TwoStageAggregateSender.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/exchange/sender/TwoStageAggregateSender.java
index bac357368c0..45d6e45d25c 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/exchange/sender/TwoStageAggregateSender.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/exchange/sender/TwoStageAggregateSender.java
@@ -208,7 +208,7 @@ public class TwoStageAggregateSender implements 
AutoCloseable {
   private IoTDBSyncClient constructIoTDBSyncClient(TEndPoint endPoint) throws 
TTransportException {
     return new IoTDBSyncClient(
         new ThriftClientProperty.Builder()
-            
.setConnectionTimeoutMs(PIPE_CONFIG.getPipeConnectorHandshakeTimeoutMs())
+            
.setConnectionTimeoutMs(PIPE_CONFIG.getPipeSinkHandshakeTimeoutMs())
             .setRpcThriftCompressionEnabled(
                 PIPE_CONFIG.isPipeConnectorRPCThriftCompressionEnabled())
             .build(),
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/client/IoTDBDataNodeAsyncClientManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/client/IoTDBDataNodeAsyncClientManager.java
index 5901e488a58..97a4d2621b0 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/client/IoTDBDataNodeAsyncClientManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/client/IoTDBDataNodeAsyncClientManager.java
@@ -300,7 +300,7 @@ public class IoTDBDataNodeAsyncClientManager extends 
IoTDBClientManager
           PipeTransferHandshakeConstant.HANDSHAKE_KEY_MARK_AS_PIPE_REQUEST,
           Boolean.toString(shouldMarkAsPipeRequest));
 
-      
client.setTimeoutDynamically(PipeConfig.getInstance().getPipeConnectorHandshakeTimeoutMs());
+      
client.setTimeoutDynamically(PipeConfig.getInstance().getPipeSinkHandshakeTimeoutMs());
       
client.pipeTransfer(PipeTransferDataNodeHandshakeV2Req.toTPipeTransferReq(params),
 callback);
       waitHandshakeFinished(isHandshakeFinished);
 
@@ -319,7 +319,7 @@ public class IoTDBDataNodeAsyncClientManager extends 
IoTDBClientManager
         resp.set(null);
         exception.set(null);
 
-        
client.setTimeoutDynamically(PipeConfig.getInstance().getPipeConnectorHandshakeTimeoutMs());
+        
client.setTimeoutDynamically(PipeConfig.getInstance().getPipeSinkHandshakeTimeoutMs());
         client.pipeTransfer(
             PipeTransferDataNodeHandshakeV1Req.toTPipeTransferReq(
                 
CommonDescriptor.getInstance().getConfig().getTimestampPrecision()),
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java
index 2b57d94bfc3..e8eca26293c 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java
@@ -519,14 +519,11 @@ public class IoTDBDataRegionAsyncSink extends IoTDBSink {
     if ((retryEventQueue.isEmpty() && retryTsFileQueue.isEmpty())
         || (!forced
             && retryEventQueueEventCounter.getTabletInsertionEventCount()
-                < PipeConfig.getInstance()
-                    
.getPipeAsyncConnectorForcedRetryTabletEventQueueSizeThreshold()
+                < 
PipeConfig.getInstance().getPipeAsyncSinkForcedRetryTabletEventQueueSize()
             && retryEventQueueEventCounter.getTsFileInsertionEventCount()
-                < PipeConfig.getInstance()
-                    
.getPipeAsyncConnectorForcedRetryTsFileEventQueueSizeThreshold()
+                < 
PipeConfig.getInstance().getPipeAsyncSinkForcedRetryTsFileEventQueueSize()
             && retryEventQueue.size() + retryTsFileQueue.size()
-                < PipeConfig.getInstance()
-                    
.getPipeAsyncConnectorForcedRetryTotalEventQueueSizeThreshold())) {
+                < 
PipeConfig.getInstance().getPipeAsyncSinkForcedRetryTotalEventQueueSize())) {
       return;
     }
 
@@ -584,14 +581,11 @@ public class IoTDBDataRegionAsyncSink extends IoTDBSink {
       if (System.currentTimeMillis() - retryStartTime
           > 
PipeConfig.getInstance().getPipeAsyncConnectorMaxRetryExecutionTimeMsPerCall()) 
{
         if (retryEventQueueEventCounter.getTabletInsertionEventCount()
-                < PipeConfig.getInstance()
-                    
.getPipeAsyncConnectorForcedRetryTabletEventQueueSizeThreshold()
+                < 
PipeConfig.getInstance().getPipeAsyncSinkForcedRetryTabletEventQueueSize()
             && retryEventQueueEventCounter.getTsFileInsertionEventCount()
-                < PipeConfig.getInstance()
-                    
.getPipeAsyncConnectorForcedRetryTsFileEventQueueSizeThreshold()
+                < 
PipeConfig.getInstance().getPipeAsyncSinkForcedRetryTsFileEventQueueSize()
             && retryEventQueue.size() + retryTsFileQueue.size()
-                < PipeConfig.getInstance()
-                    
.getPipeAsyncConnectorForcedRetryTotalEventQueueSizeThreshold()) {
+                < 
PipeConfig.getInstance().getPipeAsyncSinkForcedRetryTotalEventQueueSize()) {
           return;
         }
 
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
index 091fb56def2..9951fe217a8 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
@@ -259,16 +259,16 @@ public class CommonConfig {
   private long pipeSourceAssignerDisruptorRingBufferEntrySizeInBytes = 72 * KB;
   private long pipeSourceMatcherCacheSize = 1024;
 
-  private int pipeConnectorHandshakeTimeoutMs = 10 * 1000; // 10 seconds
+  private int pipeSinkHandshakeTimeoutMs = 10 * 1000; // 10 seconds
   private int pipeConnectorTransferTimeoutMs = 15 * 60 * 1000; // 15 minutes
   private int pipeConnectorReadFileBufferSize = 5242880; // 5MB
   private boolean isPipeConnectorReadFileBufferMemoryControlEnabled = false;
   private long pipeConnectorRetryIntervalMs = 1000L;
   private boolean pipeConnectorRPCThriftCompressionEnabled = false;
 
-  private int pipeAsyncConnectorForcedRetryTsFileEventQueueSizeThreshold = 5;
-  private int pipeAsyncConnectorForcedRetryTabletEventQueueSizeThreshold = 20;
-  private int pipeAsyncConnectorForcedRetryTotalEventQueueSizeThreshold = 30;
+  private int pipeAsyncSinkForcedRetryTsFileEventQueueSize = 5;
+  private int pipeAsyncSinkForcedRetryTabletEventQueueSize = 20;
+  private int pipeAsyncSinkForcedRetryTotalEventQueueSize = 30;
   private long pipeAsyncConnectorMaxRetryExecutionTimeMsPerCall = 500;
   private int pipeAsyncConnectorSelectorNumber =
       Math.max(4, Runtime.getRuntime().availableProcessors() / 2);
@@ -988,22 +988,21 @@ public class CommonConfig {
     logger.info("pipeExtractorMatcherCacheSize is set to {}.", 
pipeSourceMatcherCacheSize);
   }
 
-  public int getPipeConnectorHandshakeTimeoutMs() {
-    return pipeConnectorHandshakeTimeoutMs;
+  public int getPipeSinkHandshakeTimeoutMs() {
+    return pipeSinkHandshakeTimeoutMs;
   }
 
-  public void setPipeConnectorHandshakeTimeoutMs(long 
pipeConnectorHandshakeTimeoutMs) {
-    final int fPipeConnectorHandshakeTimeoutMs = 
this.pipeConnectorHandshakeTimeoutMs;
+  public void setPipeSinkHandshakeTimeoutMs(long pipeSinkHandshakeTimeoutMs) {
+    final int fPipeConnectorHandshakeTimeoutMs = 
this.pipeSinkHandshakeTimeoutMs;
     try {
-      this.pipeConnectorHandshakeTimeoutMs = 
Math.toIntExact(pipeConnectorHandshakeTimeoutMs);
+      this.pipeSinkHandshakeTimeoutMs = 
Math.toIntExact(pipeSinkHandshakeTimeoutMs);
     } catch (ArithmeticException e) {
-      this.pipeConnectorHandshakeTimeoutMs = Integer.MAX_VALUE;
+      this.pipeSinkHandshakeTimeoutMs = Integer.MAX_VALUE;
       logger.warn(
           "Given pipe connector handshake timeout is too large, set to {} 
ms.", Integer.MAX_VALUE);
     } finally {
-      if (fPipeConnectorHandshakeTimeoutMs != 
this.pipeConnectorHandshakeTimeoutMs) {
-        logger.info(
-            "pipeConnectorHandshakeTimeoutMs is set to {}.", 
fPipeConnectorHandshakeTimeoutMs);
+      if (fPipeConnectorHandshakeTimeoutMs != this.pipeSinkHandshakeTimeoutMs) 
{
+        logger.info("pipeSinkHandshakeTimeoutMs is set to {}.", 
this.pipeSinkHandshakeTimeoutMs);
       }
     }
   }
@@ -1072,55 +1071,54 @@ public class CommonConfig {
     return pipeConnectorRPCThriftCompressionEnabled;
   }
 
-  public void setPipeAsyncConnectorForcedRetryTsFileEventQueueSizeThreshold(
-      int pipeAsyncConnectorForcedRetryTsFileEventQueueSizeThreshold) {
-    if (this.pipeAsyncConnectorForcedRetryTsFileEventQueueSizeThreshold
-        == pipeAsyncConnectorForcedRetryTsFileEventQueueSizeThreshold) {
+  public void setPipeAsyncSinkForcedRetryTsFileEventQueueSize(
+      int pipeAsyncSinkForcedRetryTsFileEventQueueSize) {
+    if (this.pipeAsyncSinkForcedRetryTsFileEventQueueSize
+        == pipeAsyncSinkForcedRetryTsFileEventQueueSize) {
       return;
     }
-    this.pipeAsyncConnectorForcedRetryTsFileEventQueueSizeThreshold =
-        pipeAsyncConnectorForcedRetryTsFileEventQueueSizeThreshold;
+    this.pipeAsyncSinkForcedRetryTsFileEventQueueSize =
+        pipeAsyncSinkForcedRetryTsFileEventQueueSize;
     logger.info(
-        "pipeAsyncConnectorForcedRetryTsFileEventQueueSizeThreshold is set to 
{}.",
-        pipeAsyncConnectorForcedRetryTsFileEventQueueSizeThreshold);
+        "pipeAsyncSinkForcedRetryTsFileEventQueueSize is set to {}.",
+        pipeAsyncSinkForcedRetryTsFileEventQueueSize);
   }
 
-  public int getPipeAsyncConnectorForcedRetryTsFileEventQueueSizeThreshold() {
-    return pipeAsyncConnectorForcedRetryTsFileEventQueueSizeThreshold;
+  public int getPipeAsyncSinkForcedRetryTsFileEventQueueSize() {
+    return pipeAsyncSinkForcedRetryTsFileEventQueueSize;
   }
 
-  public void setPipeAsyncConnectorForcedRetryTabletEventQueueSizeThreshold(
-      int pipeAsyncConnectorForcedRetryTabletEventQueueSizeThreshold) {
-    if (this.pipeAsyncConnectorForcedRetryTabletEventQueueSizeThreshold
-        == pipeAsyncConnectorForcedRetryTabletEventQueueSizeThreshold) {
+  public void setPipeAsyncSinkForcedRetryTabletEventQueueSize(
+      int pipeAsyncSinkForcedRetryTabletEventQueueSize) {
+    if (this.pipeAsyncSinkForcedRetryTabletEventQueueSize
+        == pipeAsyncSinkForcedRetryTabletEventQueueSize) {
       return;
     }
-    this.pipeAsyncConnectorForcedRetryTabletEventQueueSizeThreshold =
-        pipeAsyncConnectorForcedRetryTabletEventQueueSizeThreshold;
+    this.pipeAsyncSinkForcedRetryTabletEventQueueSize =
+        pipeAsyncSinkForcedRetryTabletEventQueueSize;
     logger.info(
-        "pipeAsyncConnectorForcedRetryTabletEventQueueSizeThreshold is set to 
{}.",
-        pipeAsyncConnectorForcedRetryTabletEventQueueSizeThreshold);
+        "pipeAsyncSinkForcedRetryTabletEventQueueSize is set to {}.",
+        pipeAsyncSinkForcedRetryTabletEventQueueSize);
   }
 
-  public int getPipeAsyncConnectorForcedRetryTabletEventQueueSizeThreshold() {
-    return pipeAsyncConnectorForcedRetryTabletEventQueueSizeThreshold;
+  public int getPipeAsyncSinkForcedRetryTabletEventQueueSize() {
+    return pipeAsyncSinkForcedRetryTabletEventQueueSize;
   }
 
-  public void setPipeAsyncConnectorForcedRetryTotalEventQueueSizeThreshold(
-      int pipeAsyncConnectorForcedRetryTotalEventQueueSizeThreshold) {
-    if (this.pipeAsyncConnectorForcedRetryTotalEventQueueSizeThreshold
-        == pipeAsyncConnectorForcedRetryTotalEventQueueSizeThreshold) {
+  public void setPipeAsyncSinkForcedRetryTotalEventQueueSize(
+      int pipeAsyncSinkForcedRetryTotalEventQueueSize) {
+    if (this.pipeAsyncSinkForcedRetryTotalEventQueueSize
+        == pipeAsyncSinkForcedRetryTotalEventQueueSize) {
       return;
     }
-    this.pipeAsyncConnectorForcedRetryTotalEventQueueSizeThreshold =
-        pipeAsyncConnectorForcedRetryTotalEventQueueSizeThreshold;
+    this.pipeAsyncSinkForcedRetryTotalEventQueueSize = 
pipeAsyncSinkForcedRetryTotalEventQueueSize;
     logger.info(
-        "pipeAsyncConnectorForcedRetryTotalEventQueueSizeThreshold is set to 
{}.",
-        pipeAsyncConnectorForcedRetryTotalEventQueueSizeThreshold);
+        "pipeAsyncSinkForcedRetryTotalEventQueueSize is set to {}.",
+        pipeAsyncSinkForcedRetryTotalEventQueueSize);
   }
 
-  public int getPipeAsyncConnectorForcedRetryTotalEventQueueSizeThreshold() {
-    return pipeAsyncConnectorForcedRetryTotalEventQueueSizeThreshold;
+  public int getPipeAsyncSinkForcedRetryTotalEventQueueSize() {
+    return pipeAsyncSinkForcedRetryTotalEventQueueSize;
   }
 
   public void setPipeAsyncConnectorMaxRetryExecutionTimeMsPerCall(
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
index 2cd745a5e67..1ab26170d19 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
@@ -159,8 +159,8 @@ public class PipeConfig {
 
   /////////////////////////////// Connector ///////////////////////////////
 
-  public int getPipeConnectorHandshakeTimeoutMs() {
-    return COMMON_CONFIG.getPipeConnectorHandshakeTimeoutMs();
+  public int getPipeSinkHandshakeTimeoutMs() {
+    return COMMON_CONFIG.getPipeSinkHandshakeTimeoutMs();
   }
 
   public int getPipeConnectorTransferTimeoutMs() {
@@ -183,16 +183,16 @@ public class PipeConfig {
     return COMMON_CONFIG.isPipeConnectorRPCThriftCompressionEnabled();
   }
 
-  public int getPipeAsyncConnectorForcedRetryTsFileEventQueueSizeThreshold() {
-    return 
COMMON_CONFIG.getPipeAsyncConnectorForcedRetryTsFileEventQueueSizeThreshold();
+  public int getPipeAsyncSinkForcedRetryTsFileEventQueueSize() {
+    return COMMON_CONFIG.getPipeAsyncSinkForcedRetryTsFileEventQueueSize();
   }
 
-  public int getPipeAsyncConnectorForcedRetryTabletEventQueueSizeThreshold() {
-    return 
COMMON_CONFIG.getPipeAsyncConnectorForcedRetryTabletEventQueueSizeThreshold();
+  public int getPipeAsyncSinkForcedRetryTabletEventQueueSize() {
+    return COMMON_CONFIG.getPipeAsyncSinkForcedRetryTabletEventQueueSize();
   }
 
-  public int getPipeAsyncConnectorForcedRetryTotalEventQueueSizeThreshold() {
-    return 
COMMON_CONFIG.getPipeAsyncConnectorForcedRetryTotalEventQueueSizeThreshold();
+  public int getPipeAsyncSinkForcedRetryTotalEventQueueSize() {
+    return COMMON_CONFIG.getPipeAsyncSinkForcedRetryTotalEventQueueSize();
   }
 
   public long getPipeAsyncConnectorMaxRetryExecutionTimeMsPerCall() {
@@ -493,7 +493,7 @@ public class PipeConfig {
         getPipeSourceAssignerDisruptorRingBufferEntrySizeInBytes());
     LOGGER.info("PipeSourceMatcherCacheSize: {}", 
getPipeSourceMatcherCacheSize());
 
-    LOGGER.info("PipeConnectorHandshakeTimeoutMs: {}", 
getPipeConnectorHandshakeTimeoutMs());
+    LOGGER.info("PipeConnectorHandshakeTimeoutMs: {}", 
getPipeSinkHandshakeTimeoutMs());
     LOGGER.info("PipeConnectorTransferTimeoutMs: {}", 
getPipeConnectorTransferTimeoutMs());
     LOGGER.info("PipeConnectorReadFileBufferSize: {}", 
getPipeConnectorReadFileBufferSize());
     LOGGER.info(
@@ -540,15 +540,14 @@ public class PipeConfig {
         getPipeThresholdAllocationStrategyFixedMemoryHighUsageThreshold());
 
     LOGGER.info(
-        "PipeAsyncConnectorForcedRetryTsFileEventQueueSizeThreshold: {}",
-        getPipeAsyncConnectorForcedRetryTsFileEventQueueSizeThreshold());
+        "PipeAsyncSinkForcedRetryTsFileEventQueueSize: {}",
+        getPipeAsyncSinkForcedRetryTsFileEventQueueSize());
     LOGGER.info(
-        "PipeAsyncConnectorForcedRetryTabletEventQueueSizeThreshold: {}",
-        getPipeAsyncConnectorForcedRetryTabletEventQueueSizeThreshold());
+        "PipeAsyncSinkForcedRetryTabletEventQueueSize: {}",
+        getPipeAsyncSinkForcedRetryTabletEventQueueSize());
     LOGGER.info(
-        "PipeAsyncConnectorForcedRetryTotalEventQueueSizeThreshold: {}",
-        getPipeAsyncConnectorForcedRetryTotalEventQueueSizeThreshold());
-
+        "PipeAsyncSinkForcedRetryTotalEventQueueSize: {}",
+        getPipeAsyncSinkForcedRetryTotalEventQueueSize());
     LOGGER.info(
         "PipeAsyncConnectorMaxRetryExecutionTimeMsPerCall: {}",
         getPipeAsyncConnectorMaxRetryExecutionTimeMsPerCall());
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
index a0b29a5f716..ec6860be3fb 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
@@ -323,13 +323,13 @@ public class PipeDescriptor {
                         "pipe_extractor_matcher_cache_size",
                         
String.valueOf(config.getPipeSourceMatcherCacheSize())))));
 
-    config.setPipeConnectorHandshakeTimeoutMs(
+    config.setPipeSinkHandshakeTimeoutMs(
         Long.parseLong(
             
Optional.ofNullable(properties.getProperty("pipe_sink_handshake_timeout_ms"))
                 .orElse(
                     properties.getProperty(
                         "pipe_connector_handshake_timeout_ms",
-                        
String.valueOf(config.getPipeConnectorHandshakeTimeoutMs())))));
+                        
String.valueOf(config.getPipeSinkHandshakeTimeoutMs())))));
     config.setPipeConnectorReadFileBufferSize(
         Integer.parseInt(
             
Optional.ofNullable(properties.getProperty("pipe_sink_read_file_buffer_size"))
@@ -368,7 +368,7 @@ public class PipeDescriptor {
                         
"pipe_async_connector_max_retry_execution_time_ms_per_call",
                         String.valueOf(
                             
config.getPipeAsyncConnectorMaxRetryExecutionTimeMsPerCall())))));
-    config.setPipeAsyncConnectorForcedRetryTsFileEventQueueSizeThreshold(
+    config.setPipeAsyncSinkForcedRetryTsFileEventQueueSize(
         Integer.parseInt(
             Optional.ofNullable(
                     
properties.getProperty("pipe_async_sink_forced_retry_tsfile_event_queue_size"))
@@ -376,9 +376,8 @@ public class PipeDescriptor {
                     properties.getProperty(
                         
"pipe_async_connector_forced_retry_tsfile_event_queue_size",
                         String.valueOf(
-                            config
-                                
.getPipeAsyncConnectorForcedRetryTsFileEventQueueSizeThreshold())))));
-    config.setPipeAsyncConnectorForcedRetryTabletEventQueueSizeThreshold(
+                            
config.getPipeAsyncSinkForcedRetryTsFileEventQueueSize())))));
+    config.setPipeAsyncSinkForcedRetryTabletEventQueueSize(
         Integer.parseInt(
             Optional.ofNullable(
                     
properties.getProperty("pipe_async_sink_forced_retry_tablet_event_queue_size"))
@@ -386,18 +385,15 @@ public class PipeDescriptor {
                     properties.getProperty(
                         
"pipe_async_connector_forced_retry_tablet_event_queue_size",
                         String.valueOf(
-                            config
-                                
.getPipeAsyncConnectorForcedRetryTabletEventQueueSizeThreshold())))));
-    config.setPipeAsyncConnectorForcedRetryTotalEventQueueSizeThreshold(
+                            
config.getPipeAsyncSinkForcedRetryTabletEventQueueSize())))));
+    config.setPipeAsyncSinkForcedRetryTotalEventQueueSize(
         Integer.parseInt(
             Optional.ofNullable(
                     
properties.getProperty("pipe_async_sink_forced_retry_total_event_queue_size"))
                 .orElse(
                     properties.getProperty(
                         
"pipe_async_connector_forced_retry_total_event_queue_size",
-                        String.valueOf(
-                            config
-                                
.getPipeAsyncConnectorForcedRetryTotalEventQueueSizeThreshold())))));
+                        
String.valueOf(config.getPipeAsyncSinkForcedRetryTotalEventQueueSize())))));
     config.setRateLimiterHotReloadCheckIntervalMs(
         Integer.parseInt(
             properties.getProperty(
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/client/IoTDBSyncClientManager.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/client/IoTDBSyncClientManager.java
index 7dc92b8cba2..fa5f0d3383c 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/client/IoTDBSyncClientManager.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/client/IoTDBSyncClientManager.java
@@ -193,7 +193,7 @@ public abstract class IoTDBSyncClientManager extends 
IoTDBClientManager implemen
       clientAndStatus.setLeft(
           new IoTDBSyncClient(
               new ThriftClientProperty.Builder()
-                  
.setConnectionTimeoutMs(PIPE_CONFIG.getPipeConnectorHandshakeTimeoutMs())
+                  
.setConnectionTimeoutMs(PIPE_CONFIG.getPipeSinkHandshakeTimeoutMs())
                   .setRpcThriftCompressionEnabled(
                       PIPE_CONFIG.isPipeConnectorRPCThriftCompressionEnabled())
                   .build(),

Reply via email to