This is an automated email from the ASF dual-hosted git repository.

justinchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new a6191d916b2 Pipe: Modify the Pipe configuration item log name to be 
consistent with the Properties name. (#16732)
a6191d916b2 is described below

commit a6191d916b2ceca7f3b22e728ae4117b99290261
Author: Zhenyu Luo <[email protected]>
AuthorDate: Fri Nov 14 15:06:44 2025 +0800

    Pipe: Modify the Pipe configuration item log name to be consistent with the 
Properties name. (#16732)
    
    * Pipe: Modify the Pipe configuration item log name to be consistent with 
the Properties name.
    
    * update
---
 .../thrift/async/IoTDBDataRegionAsyncSink.java     | 18 ++----
 .../apache/iotdb/commons/conf/CommonConfig.java    | 67 +++++++++++-----------
 .../iotdb/commons/pipe/config/PipeConfig.java      | 18 +++---
 .../iotdb/commons/pipe/config/PipeDescriptor.java  | 16 ++----
 4 files changed, 54 insertions(+), 65 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java
index 567715e7292..e6e368a5280 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java
@@ -539,14 +539,11 @@ public class IoTDBDataRegionAsyncSink extends IoTDBSink {
     if ((retryEventQueue.isEmpty() && retryTsFileQueue.isEmpty())
         || (!forced
             && retryEventQueueEventCounter.getTabletInsertionEventCount()
-                < PipeConfig.getInstance()
-                    
.getPipeAsyncConnectorForcedRetryTabletEventQueueSizeThreshold()
+                < 
PipeConfig.getInstance().getPipeAsyncSinkForcedRetryTabletEventQueueSize()
             && retryEventQueueEventCounter.getTsFileInsertionEventCount()
-                < PipeConfig.getInstance()
-                    
.getPipeAsyncConnectorForcedRetryTsFileEventQueueSizeThreshold()
+                < 
PipeConfig.getInstance().getPipeAsyncSinkForcedRetryTsFileEventQueueSize()
             && retryEventQueue.size() + retryTsFileQueue.size()
-                < PipeConfig.getInstance()
-                    
.getPipeAsyncConnectorForcedRetryTotalEventQueueSizeThreshold())) {
+                < 
PipeConfig.getInstance().getPipeAsyncSinkForcedRetryTotalEventQueueSize())) {
       return;
     }
 
@@ -604,14 +601,11 @@ public class IoTDBDataRegionAsyncSink extends IoTDBSink {
       if (System.currentTimeMillis() - retryStartTime
           > 
PipeConfig.getInstance().getPipeAsyncConnectorMaxRetryExecutionTimeMsPerCall()) 
{
         if (retryEventQueueEventCounter.getTabletInsertionEventCount()
-                < PipeConfig.getInstance()
-                    
.getPipeAsyncConnectorForcedRetryTabletEventQueueSizeThreshold()
+                < 
PipeConfig.getInstance().getPipeAsyncSinkForcedRetryTabletEventQueueSize()
             && retryEventQueueEventCounter.getTsFileInsertionEventCount()
-                < PipeConfig.getInstance()
-                    
.getPipeAsyncConnectorForcedRetryTsFileEventQueueSizeThreshold()
+                < 
PipeConfig.getInstance().getPipeAsyncSinkForcedRetryTsFileEventQueueSize()
             && retryEventQueue.size() + retryTsFileQueue.size()
-                < PipeConfig.getInstance()
-                    
.getPipeAsyncConnectorForcedRetryTotalEventQueueSizeThreshold()) {
+                < 
PipeConfig.getInstance().getPipeAsyncSinkForcedRetryTotalEventQueueSize()) {
           return;
         }
 
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
index 9102a117eca..d83e96eb100 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
@@ -269,9 +269,9 @@ public class CommonConfig {
   private long pipeConnectorRetryIntervalMs = 1000L;
   private boolean pipeConnectorRPCThriftCompressionEnabled = false;
 
-  private int pipeAsyncConnectorForcedRetryTsFileEventQueueSizeThreshold = 5;
-  private int pipeAsyncConnectorForcedRetryTabletEventQueueSizeThreshold = 20;
-  private int pipeAsyncConnectorForcedRetryTotalEventQueueSizeThreshold = 30;
+  private int pipeAsyncSinkForcedRetryTsFileEventQueueSize = 5;
+  private int pipeAsyncSinkForcedRetryTabletEventQueueSize = 20;
+  private int pipeAsyncSinkForcedRetryTotalEventQueueSize = 30;
   private long pipeAsyncConnectorMaxRetryExecutionTimeMsPerCall = 500;
   private int pipeAsyncConnectorSelectorNumber =
       Math.max(4, Runtime.getRuntime().availableProcessors() / 2);
@@ -1058,7 +1058,7 @@ public class CommonConfig {
     } finally {
       if (fPipeConnectorHandshakeTimeoutMs != 
this.pipeConnectorHandshakeTimeoutMs) {
         logger.info(
-            "pipeConnectorHandshakeTimeoutMs is set to {}.", 
fPipeConnectorHandshakeTimeoutMs);
+            "pipeConnectorHandshakeTimeoutMs is set to {}.", 
this.pipeConnectorHandshakeTimeoutMs);
       }
     }
   }
@@ -1127,55 +1127,54 @@ public class CommonConfig {
     return pipeConnectorRPCThriftCompressionEnabled;
   }
 
-  public void setPipeAsyncConnectorForcedRetryTsFileEventQueueSizeThreshold(
-      int pipeAsyncConnectorForcedRetryTsFileEventQueueSizeThreshold) {
-    if (this.pipeAsyncConnectorForcedRetryTsFileEventQueueSizeThreshold
-        == pipeAsyncConnectorForcedRetryTsFileEventQueueSizeThreshold) {
+  public void setPipeAsyncSinkForcedRetryTsFileEventQueueSize(
+      int pipeAsyncSinkForcedRetryTsFileEventQueueSize) {
+    if (this.pipeAsyncSinkForcedRetryTsFileEventQueueSize
+        == pipeAsyncSinkForcedRetryTsFileEventQueueSize) {
       return;
     }
-    this.pipeAsyncConnectorForcedRetryTsFileEventQueueSizeThreshold =
-        pipeAsyncConnectorForcedRetryTsFileEventQueueSizeThreshold;
+    this.pipeAsyncSinkForcedRetryTsFileEventQueueSize =
+        pipeAsyncSinkForcedRetryTsFileEventQueueSize;
     logger.info(
-        "pipeAsyncConnectorForcedRetryTsFileEventQueueSizeThreshold is set to 
{}.",
-        pipeAsyncConnectorForcedRetryTsFileEventQueueSizeThreshold);
+        "pipeAsyncSinkForcedRetryTsFileEventQueueSize is set to {}.",
+        pipeAsyncSinkForcedRetryTsFileEventQueueSize);
   }
 
-  public int getPipeAsyncConnectorForcedRetryTsFileEventQueueSizeThreshold() {
-    return pipeAsyncConnectorForcedRetryTsFileEventQueueSizeThreshold;
+  public int getPipeAsyncSinkForcedRetryTsFileEventQueueSize() {
+    return pipeAsyncSinkForcedRetryTsFileEventQueueSize;
   }
 
-  public void setPipeAsyncConnectorForcedRetryTabletEventQueueSizeThreshold(
-      int pipeAsyncConnectorForcedRetryTabletEventQueueSizeThreshold) {
-    if (this.pipeAsyncConnectorForcedRetryTabletEventQueueSizeThreshold
-        == pipeAsyncConnectorForcedRetryTabletEventQueueSizeThreshold) {
+  public void setPipeAsyncSinkForcedRetryTabletEventQueueSize(
+      int pipeAsyncSinkForcedRetryTabletEventQueueSize) {
+    if (this.pipeAsyncSinkForcedRetryTabletEventQueueSize
+        == pipeAsyncSinkForcedRetryTabletEventQueueSize) {
       return;
     }
-    this.pipeAsyncConnectorForcedRetryTabletEventQueueSizeThreshold =
-        pipeAsyncConnectorForcedRetryTabletEventQueueSizeThreshold;
+    this.pipeAsyncSinkForcedRetryTabletEventQueueSize =
+        pipeAsyncSinkForcedRetryTabletEventQueueSize;
     logger.info(
-        "pipeAsyncConnectorForcedRetryTabletEventQueueSizeThreshold is set to 
{}.",
-        pipeAsyncConnectorForcedRetryTabletEventQueueSizeThreshold);
+        "pipeAsyncSinkForcedRetryTabletEventQueueSize is set to {}.",
+        pipeAsyncSinkForcedRetryTabletEventQueueSize);
   }
 
-  public int getPipeAsyncConnectorForcedRetryTabletEventQueueSizeThreshold() {
-    return pipeAsyncConnectorForcedRetryTabletEventQueueSizeThreshold;
+  public int getPipeAsyncSinkForcedRetryTabletEventQueueSize() {
+    return pipeAsyncSinkForcedRetryTabletEventQueueSize;
   }
 
-  public void setPipeAsyncConnectorForcedRetryTotalEventQueueSizeThreshold(
-      int pipeAsyncConnectorForcedRetryTotalEventQueueSizeThreshold) {
-    if (this.pipeAsyncConnectorForcedRetryTotalEventQueueSizeThreshold
-        == pipeAsyncConnectorForcedRetryTotalEventQueueSizeThreshold) {
+  public void setPipeAsyncSinkForcedRetryTotalEventQueueSize(
+      int pipeAsyncSinkForcedRetryTotalEventQueueSize) {
+    if (this.pipeAsyncSinkForcedRetryTotalEventQueueSize
+        == pipeAsyncSinkForcedRetryTotalEventQueueSize) {
       return;
     }
-    this.pipeAsyncConnectorForcedRetryTotalEventQueueSizeThreshold =
-        pipeAsyncConnectorForcedRetryTotalEventQueueSizeThreshold;
+    this.pipeAsyncSinkForcedRetryTotalEventQueueSize = 
pipeAsyncSinkForcedRetryTotalEventQueueSize;
     logger.info(
-        "pipeAsyncConnectorForcedRetryTotalEventQueueSizeThreshold is set to 
{}.",
-        pipeAsyncConnectorForcedRetryTotalEventQueueSizeThreshold);
+        "pipeAsyncSinkForcedRetryTotalEventQueueSize is set to {}.",
+        pipeAsyncSinkForcedRetryTotalEventQueueSize);
   }
 
-  public int getPipeAsyncConnectorForcedRetryTotalEventQueueSizeThreshold() {
-    return pipeAsyncConnectorForcedRetryTotalEventQueueSizeThreshold;
+  public int getPipeAsyncSinkForcedRetryTotalEventQueueSize() {
+    return pipeAsyncSinkForcedRetryTotalEventQueueSize;
   }
 
   public void setPipeAsyncConnectorMaxRetryExecutionTimeMsPerCall(
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
index 637628149b6..ce09fb1f291 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
@@ -183,16 +183,16 @@ public class PipeConfig {
     return COMMON_CONFIG.isPipeConnectorRPCThriftCompressionEnabled();
   }
 
-  public int getPipeAsyncConnectorForcedRetryTsFileEventQueueSizeThreshold() {
-    return 
COMMON_CONFIG.getPipeAsyncConnectorForcedRetryTsFileEventQueueSizeThreshold();
+  public int getPipeAsyncSinkForcedRetryTsFileEventQueueSize() {
+    return COMMON_CONFIG.getPipeAsyncSinkForcedRetryTsFileEventQueueSize();
   }
 
-  public int getPipeAsyncConnectorForcedRetryTabletEventQueueSizeThreshold() {
-    return 
COMMON_CONFIG.getPipeAsyncConnectorForcedRetryTabletEventQueueSizeThreshold();
+  public int getPipeAsyncSinkForcedRetryTabletEventQueueSize() {
+    return COMMON_CONFIG.getPipeAsyncSinkForcedRetryTabletEventQueueSize();
   }
 
-  public int getPipeAsyncConnectorForcedRetryTotalEventQueueSizeThreshold() {
-    return 
COMMON_CONFIG.getPipeAsyncConnectorForcedRetryTotalEventQueueSizeThreshold();
+  public int getPipeAsyncSinkForcedRetryTotalEventQueueSize() {
+    return COMMON_CONFIG.getPipeAsyncSinkForcedRetryTotalEventQueueSize();
   }
 
   public long getPipeAsyncConnectorMaxRetryExecutionTimeMsPerCall() {
@@ -541,13 +541,13 @@ public class PipeConfig {
 
     LOGGER.info(
         "PipeAsyncConnectorForcedRetryTsFileEventQueueSizeThreshold: {}",
-        getPipeAsyncConnectorForcedRetryTsFileEventQueueSizeThreshold());
+        getPipeAsyncSinkForcedRetryTsFileEventQueueSize());
     LOGGER.info(
         "PipeAsyncConnectorForcedRetryTabletEventQueueSizeThreshold: {}",
-        getPipeAsyncConnectorForcedRetryTabletEventQueueSizeThreshold());
+        getPipeAsyncSinkForcedRetryTabletEventQueueSize());
     LOGGER.info(
         "PipeAsyncConnectorForcedRetryTotalEventQueueSizeThreshold: {}",
-        getPipeAsyncConnectorForcedRetryTotalEventQueueSizeThreshold());
+        getPipeAsyncSinkForcedRetryTotalEventQueueSize());
     LOGGER.info(
         "PipeAsyncConnectorMaxRetryExecutionTimeMsPerCall: {}",
         getPipeAsyncConnectorMaxRetryExecutionTimeMsPerCall());
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
index 77aae8a3252..928ff5f25a5 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
@@ -368,7 +368,7 @@ public class PipeDescriptor {
                         
"pipe_async_connector_max_retry_execution_time_ms_per_call",
                         String.valueOf(
                             
config.getPipeAsyncConnectorMaxRetryExecutionTimeMsPerCall())))));
-    config.setPipeAsyncConnectorForcedRetryTsFileEventQueueSizeThreshold(
+    config.setPipeAsyncSinkForcedRetryTsFileEventQueueSize(
         Integer.parseInt(
             Optional.ofNullable(
                     
properties.getProperty("pipe_async_sink_forced_retry_tsfile_event_queue_size"))
@@ -376,9 +376,8 @@ public class PipeDescriptor {
                     properties.getProperty(
                         
"pipe_async_connector_forced_retry_tsfile_event_queue_size",
                         String.valueOf(
-                            config
-                                
.getPipeAsyncConnectorForcedRetryTsFileEventQueueSizeThreshold())))));
-    config.setPipeAsyncConnectorForcedRetryTabletEventQueueSizeThreshold(
+                            
config.getPipeAsyncSinkForcedRetryTsFileEventQueueSize())))));
+    config.setPipeAsyncSinkForcedRetryTabletEventQueueSize(
         Integer.parseInt(
             Optional.ofNullable(
                     
properties.getProperty("pipe_async_sink_forced_retry_tablet_event_queue_size"))
@@ -386,18 +385,15 @@ public class PipeDescriptor {
                     properties.getProperty(
                         
"pipe_async_connector_forced_retry_tablet_event_queue_size",
                         String.valueOf(
-                            config
-                                
.getPipeAsyncConnectorForcedRetryTabletEventQueueSizeThreshold())))));
-    config.setPipeAsyncConnectorForcedRetryTotalEventQueueSizeThreshold(
+                            
config.getPipeAsyncSinkForcedRetryTabletEventQueueSize())))));
+    config.setPipeAsyncSinkForcedRetryTotalEventQueueSize(
         Integer.parseInt(
             Optional.ofNullable(
                     
properties.getProperty("pipe_async_sink_forced_retry_total_event_queue_size"))
                 .orElse(
                     properties.getProperty(
                         
"pipe_async_connector_forced_retry_total_event_queue_size",
-                        String.valueOf(
-                            config
-                                
.getPipeAsyncConnectorForcedRetryTotalEventQueueSizeThreshold())))));
+                        
String.valueOf(config.getPipeAsyncSinkForcedRetryTotalEventQueueSize())))));
     config.setRateLimiterHotReloadCheckIntervalMs(
         Integer.parseInt(
             properties.getProperty(

Reply via email to