This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 440935489cc Pipe: Changed the property files' "extractor" and 
"connector" related parameters to their "source" and "sink" counterparts 
(#11751)
440935489cc is described below

commit 440935489cc3de93e0f7eb2b5d2970703e64f359
Author: Caideyipi <[email protected]>
AuthorDate: Thu Dec 21 11:16:01 2023 +0800

    Pipe: Changed the property files' "extractor" and "connector" related 
parameters to their "source" and "sink" counterparts (#11751)
    
    Old:
    
    ```
    # The connection timeout (in milliseconds) for the thrift client.
    # pipe_connector_timeout_ms=900000
    
    # The maximum number of selectors that can be used in the async connector.
    # pipe_async_connector_selector_number=1
    
    # The core number of clients that can be used in the async connector.
    # pipe_async_connector_core_client_number=8
    
    # The maximum number of clients that can be used in the async connector.
    # pipe_async_connector_max_client_number=16
    ```
    
    New:
    ```
    # The connection timeout (in milliseconds) for the thrift client.
    # pipe_sink_timeout_ms=900000
    
    # The maximum number of selectors that can be used in the sink.
    # pipe_sink_selector_number=1
    
    # The core number of clients that can be used in the sink.
    # pipe_sink_core_client_number=8
    
    # The maximum number of clients that can be used in the sink.
    # pipe_sink_max_client_number=16
    ```
    
    Old parameters can also be parsed and applie
---
 .../resources/conf/iotdb-common.properties         |  14 +--
 .../iotdb/commons/conf/CommonDescriptor.java       | 104 +++++++++++++--------
 2 files changed, 74 insertions(+), 44 deletions(-)

diff --git 
a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-common.properties 
b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-common.properties
index ce0a26bfa61..e67158a1ed1 100644
--- 
a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-common.properties
+++ 
b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-common.properties
@@ -956,16 +956,16 @@ timestamp_precision=ms
 # pipe_subtask_executor_max_thread_num=5
 
 # The connection timeout (in milliseconds) for the thrift client.
-# pipe_connector_timeout_ms=900000
+# pipe_sink_timeout_ms=900000
 
-# The maximum number of selectors that can be used in the async connector.
-# pipe_async_connector_selector_number=1
+# The maximum number of selectors that can be used in the sink.
+# pipe_sink_selector_number=1
 
-# The core number of clients that can be used in the async connector.
-# pipe_async_connector_core_client_number=8
+# The core number of clients that can be used in the sink.
+# pipe_sink_core_client_number=8
 
-# The maximum number of clients that can be used in the async connector.
-# pipe_async_connector_max_client_number=16
+# The maximum number of clients that can be used in the sink.
+# pipe_sink_max_client_number=16
 
 # Whether to enable receiving pipe data through air gap.
 # The receiver can only return 0 or 1 in tcp mode to indicate whether the data 
is received successfully.
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
index 3b873571f7f..cd9404bb4e7 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.commons.utils.CommonDateTimeUtils;
 import org.apache.iotdb.confignode.rpc.thrift.TGlobalConfig;
 
 import java.io.File;
+import java.util.Optional;
 import java.util.Properties;
 
 public class CommonDescriptor {
@@ -307,67 +308,96 @@ public class CommonDescriptor {
 
     config.setPipeExtractorAssignerDisruptorRingBufferSize(
         Integer.parseInt(
-            properties.getProperty(
-                "pipe_extractor_assigner_disruptor_ring_buffer_size",
-                
String.valueOf(config.getPipeExtractorAssignerDisruptorRingBufferSize()))));
+            Optional.ofNullable(
+                    
properties.getProperty("pipe_source_assigner_disruptor_ring_buffer_size"))
+                .orElse(
+                    properties.getProperty(
+                        "pipe_extractor_assigner_disruptor_ring_buffer_size",
+                        String.valueOf(
+                            
config.getPipeExtractorAssignerDisruptorRingBufferSize())))));
     config.setPipeExtractorAssignerDisruptorRingBufferEntrySizeInBytes( // 1MB
         Integer.parseInt(
-            properties.getProperty(
-                
"pipe_extractor_assigner_disruptor_ring_buffer_entry_size_in_bytes",
-                String.valueOf(
-                    
config.getPipeExtractorAssignerDisruptorRingBufferEntrySizeInBytes()))));
+            Optional.ofNullable(
+                    properties.getProperty(
+                        
"pipe_source_assigner_disruptor_ring_buffer_entry_size_in_bytes"))
+                .orElse(
+                    properties.getProperty(
+                        
"pipe_extractor_assigner_disruptor_ring_buffer_entry_size_in_bytes",
+                        String.valueOf(
+                            config
+                                
.getPipeExtractorAssignerDisruptorRingBufferEntrySizeInBytes())))));
     config.setPipeExtractorMatcherCacheSize(
         Integer.parseInt(
-            properties.getProperty(
-                "pipe_extractor_matcher_cache_size",
-                String.valueOf(config.getPipeExtractorMatcherCacheSize()))));
+            
Optional.ofNullable(properties.getProperty("pipe_source_matcher_cache_size"))
+                .orElse(
+                    properties.getProperty(
+                        "pipe_extractor_matcher_cache_size",
+                        
String.valueOf(config.getPipeExtractorMatcherCacheSize())))));
 
     config.setPipeConnectorHandshakeTimeoutMs(
         Long.parseLong(
-            properties.getProperty(
-                "pipe_connector_handshake_timeout_ms",
-                String.valueOf(config.getPipeConnectorHandshakeTimeoutMs()))));
+            
Optional.ofNullable(properties.getProperty("pipe_sink_handshake_timeout_ms"))
+                .orElse(
+                    properties.getProperty(
+                        "pipe_connector_handshake_timeout_ms",
+                        
String.valueOf(config.getPipeConnectorHandshakeTimeoutMs())))));
     config.setPipeConnectorTransferTimeoutMs(
         Long.parseLong(
-            properties.getProperty(
-                "pipe_connector_timeout_ms",
-                String.valueOf(config.getPipeConnectorTransferTimeoutMs()))));
+            Optional.ofNullable(properties.getProperty("pipe_sink_timeout_ms"))
+                .orElse(
+                    properties.getProperty(
+                        "pipe_connector_timeout_ms",
+                        
String.valueOf(config.getPipeConnectorTransferTimeoutMs())))));
     config.setPipeConnectorReadFileBufferSize(
         Integer.parseInt(
-            properties.getProperty(
-                "pipe_connector_read_file_buffer_size",
-                String.valueOf(config.getPipeConnectorReadFileBufferSize()))));
+            
Optional.ofNullable(properties.getProperty("pipe_sink_read_file_buffer_size"))
+                .orElse(
+                    properties.getProperty(
+                        "pipe_connector_read_file_buffer_size",
+                        
String.valueOf(config.getPipeConnectorReadFileBufferSize())))));
     config.setPipeConnectorRetryIntervalMs(
         Long.parseLong(
-            properties.getProperty(
-                "pipe_connector_retry_interval_ms",
-                String.valueOf(config.getPipeConnectorRetryIntervalMs()))));
+            
Optional.ofNullable(properties.getProperty("pipe_sink_retry_interval_ms"))
+                .orElse(
+                    properties.getProperty(
+                        "pipe_connector_retry_interval_ms",
+                        
String.valueOf(config.getPipeConnectorRetryIntervalMs())))));
     config.setPipeConnectorPendingQueueSize(
         Integer.parseInt(
-            properties.getProperty(
-                "pipe_connector_pending_queue_size",
-                String.valueOf(config.getPipeConnectorPendingQueueSize()))));
+            
Optional.ofNullable(properties.getProperty("pipe_sink_pending_queue_size"))
+                .orElse(
+                    properties.getProperty(
+                        "pipe_connector_pending_queue_size",
+                        
String.valueOf(config.getPipeConnectorPendingQueueSize())))));
     config.setPipeConnectorRPCThriftCompressionEnabled(
         Boolean.parseBoolean(
-            properties.getProperty(
-                "pipe_connector_rpc_thrift_compression_enabled",
-                
String.valueOf(config.isPipeConnectorRPCThriftCompressionEnabled()))));
+            
Optional.ofNullable(properties.getProperty("pipe_sink_rpc_thrift_compression_enabled"))
+                .orElse(
+                    properties.getProperty(
+                        "pipe_connector_rpc_thrift_compression_enabled",
+                        
String.valueOf(config.isPipeConnectorRPCThriftCompressionEnabled())))));
 
     config.setPipeAsyncConnectorSelectorNumber(
         Integer.parseInt(
-            properties.getProperty(
-                "pipe_async_connector_selector_number",
-                
String.valueOf(config.getPipeAsyncConnectorSelectorNumber()))));
+            
Optional.ofNullable(properties.getProperty("pipe_sink_selector_number"))
+                .orElse(
+                    properties.getProperty(
+                        "pipe_async_connector_selector_number",
+                        
String.valueOf(config.getPipeAsyncConnectorSelectorNumber())))));
     config.setPipeAsyncConnectorCoreClientNumber(
         Integer.parseInt(
-            properties.getProperty(
-                "pipe_async_connector_core_client_number",
-                
String.valueOf(config.getPipeAsyncConnectorCoreClientNumber()))));
+            
Optional.ofNullable(properties.getProperty("pipe_sink_core_client_number"))
+                .orElse(
+                    properties.getProperty(
+                        "pipe_async_connector_core_client_number",
+                        
String.valueOf(config.getPipeAsyncConnectorCoreClientNumber())))));
     config.setPipeAsyncConnectorMaxClientNumber(
         Integer.parseInt(
-            properties.getProperty(
-                "pipe_async_connector_max_client_number",
-                
String.valueOf(config.getPipeAsyncConnectorMaxClientNumber()))));
+            
Optional.ofNullable(properties.getProperty("pipe_sink_max_client_number"))
+                .orElse(
+                    properties.getProperty(
+                        "pipe_async_connector_max_client_number",
+                        
String.valueOf(config.getPipeAsyncConnectorMaxClientNumber())))));
 
     config.setSeperatedPipeHeartbeatEnabled(
         Boolean.parseBoolean(

Reply via email to