This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 440935489cc Pipe: Changed the property files' "extractor" and
"connector" related parameters to their "source" and "sink" counterparts
(#11751)
440935489cc is described below
commit 440935489cc3de93e0f7eb2b5d2970703e64f359
Author: Caideyipi <[email protected]>
AuthorDate: Thu Dec 21 11:16:01 2023 +0800
Pipe: Changed the property files' "extractor" and "connector" related
parameters to their "source" and "sink" counterparts (#11751)
Old:
```
# The connection timeout (in milliseconds) for the thrift client.
# pipe_connector_timeout_ms=900000
# The maximum number of selectors that can be used in the async connector.
# pipe_async_connector_selector_number=1
# The core number of clients that can be used in the async connector.
# pipe_async_connector_core_client_number=8
# The maximum number of clients that can be used in the async connector.
# pipe_async_connector_max_client_number=16
```
New:
```
# The connection timeout (in milliseconds) for the thrift client.
# pipe_sink_timeout_ms=900000
# The maximum number of selectors that can be used in the sink.
# pipe_sink_selector_number=1
# The core number of clients that can be used in the sink.
# pipe_sink_core_client_number=8
# The maximum number of clients that can be used in the sink.
# pipe_sink_max_client_number=16
```
Old parameters can also be parsed and applie
---
.../resources/conf/iotdb-common.properties | 14 +--
.../iotdb/commons/conf/CommonDescriptor.java | 104 +++++++++++++--------
2 files changed, 74 insertions(+), 44 deletions(-)
diff --git
a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-common.properties
b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-common.properties
index ce0a26bfa61..e67158a1ed1 100644
---
a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-common.properties
+++
b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-common.properties
@@ -956,16 +956,16 @@ timestamp_precision=ms
# pipe_subtask_executor_max_thread_num=5
# The connection timeout (in milliseconds) for the thrift client.
-# pipe_connector_timeout_ms=900000
+# pipe_sink_timeout_ms=900000
-# The maximum number of selectors that can be used in the async connector.
-# pipe_async_connector_selector_number=1
+# The maximum number of selectors that can be used in the sink.
+# pipe_sink_selector_number=1
-# The core number of clients that can be used in the async connector.
-# pipe_async_connector_core_client_number=8
+# The core number of clients that can be used in the sink.
+# pipe_sink_core_client_number=8
-# The maximum number of clients that can be used in the async connector.
-# pipe_async_connector_max_client_number=16
+# The maximum number of clients that can be used in the sink.
+# pipe_sink_max_client_number=16
# Whether to enable receiving pipe data through air gap.
# The receiver can only return 0 or 1 in tcp mode to indicate whether the data
is received successfully.
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
index 3b873571f7f..cd9404bb4e7 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.commons.utils.CommonDateTimeUtils;
import org.apache.iotdb.confignode.rpc.thrift.TGlobalConfig;
import java.io.File;
+import java.util.Optional;
import java.util.Properties;
public class CommonDescriptor {
@@ -307,67 +308,96 @@ public class CommonDescriptor {
config.setPipeExtractorAssignerDisruptorRingBufferSize(
Integer.parseInt(
- properties.getProperty(
- "pipe_extractor_assigner_disruptor_ring_buffer_size",
-
String.valueOf(config.getPipeExtractorAssignerDisruptorRingBufferSize()))));
+ Optional.ofNullable(
+
properties.getProperty("pipe_source_assigner_disruptor_ring_buffer_size"))
+ .orElse(
+ properties.getProperty(
+ "pipe_extractor_assigner_disruptor_ring_buffer_size",
+ String.valueOf(
+
config.getPipeExtractorAssignerDisruptorRingBufferSize())))));
config.setPipeExtractorAssignerDisruptorRingBufferEntrySizeInBytes( // 1MB
Integer.parseInt(
- properties.getProperty(
-
"pipe_extractor_assigner_disruptor_ring_buffer_entry_size_in_bytes",
- String.valueOf(
-
config.getPipeExtractorAssignerDisruptorRingBufferEntrySizeInBytes()))));
+ Optional.ofNullable(
+ properties.getProperty(
+
"pipe_source_assigner_disruptor_ring_buffer_entry_size_in_bytes"))
+ .orElse(
+ properties.getProperty(
+
"pipe_extractor_assigner_disruptor_ring_buffer_entry_size_in_bytes",
+ String.valueOf(
+ config
+
.getPipeExtractorAssignerDisruptorRingBufferEntrySizeInBytes())))));
config.setPipeExtractorMatcherCacheSize(
Integer.parseInt(
- properties.getProperty(
- "pipe_extractor_matcher_cache_size",
- String.valueOf(config.getPipeExtractorMatcherCacheSize()))));
+
Optional.ofNullable(properties.getProperty("pipe_source_matcher_cache_size"))
+ .orElse(
+ properties.getProperty(
+ "pipe_extractor_matcher_cache_size",
+
String.valueOf(config.getPipeExtractorMatcherCacheSize())))));
config.setPipeConnectorHandshakeTimeoutMs(
Long.parseLong(
- properties.getProperty(
- "pipe_connector_handshake_timeout_ms",
- String.valueOf(config.getPipeConnectorHandshakeTimeoutMs()))));
+
Optional.ofNullable(properties.getProperty("pipe_sink_handshake_timeout_ms"))
+ .orElse(
+ properties.getProperty(
+ "pipe_connector_handshake_timeout_ms",
+
String.valueOf(config.getPipeConnectorHandshakeTimeoutMs())))));
config.setPipeConnectorTransferTimeoutMs(
Long.parseLong(
- properties.getProperty(
- "pipe_connector_timeout_ms",
- String.valueOf(config.getPipeConnectorTransferTimeoutMs()))));
+ Optional.ofNullable(properties.getProperty("pipe_sink_timeout_ms"))
+ .orElse(
+ properties.getProperty(
+ "pipe_connector_timeout_ms",
+
String.valueOf(config.getPipeConnectorTransferTimeoutMs())))));
config.setPipeConnectorReadFileBufferSize(
Integer.parseInt(
- properties.getProperty(
- "pipe_connector_read_file_buffer_size",
- String.valueOf(config.getPipeConnectorReadFileBufferSize()))));
+
Optional.ofNullable(properties.getProperty("pipe_sink_read_file_buffer_size"))
+ .orElse(
+ properties.getProperty(
+ "pipe_connector_read_file_buffer_size",
+
String.valueOf(config.getPipeConnectorReadFileBufferSize())))));
config.setPipeConnectorRetryIntervalMs(
Long.parseLong(
- properties.getProperty(
- "pipe_connector_retry_interval_ms",
- String.valueOf(config.getPipeConnectorRetryIntervalMs()))));
+
Optional.ofNullable(properties.getProperty("pipe_sink_retry_interval_ms"))
+ .orElse(
+ properties.getProperty(
+ "pipe_connector_retry_interval_ms",
+
String.valueOf(config.getPipeConnectorRetryIntervalMs())))));
config.setPipeConnectorPendingQueueSize(
Integer.parseInt(
- properties.getProperty(
- "pipe_connector_pending_queue_size",
- String.valueOf(config.getPipeConnectorPendingQueueSize()))));
+
Optional.ofNullable(properties.getProperty("pipe_sink_pending_queue_size"))
+ .orElse(
+ properties.getProperty(
+ "pipe_connector_pending_queue_size",
+
String.valueOf(config.getPipeConnectorPendingQueueSize())))));
config.setPipeConnectorRPCThriftCompressionEnabled(
Boolean.parseBoolean(
- properties.getProperty(
- "pipe_connector_rpc_thrift_compression_enabled",
-
String.valueOf(config.isPipeConnectorRPCThriftCompressionEnabled()))));
+
Optional.ofNullable(properties.getProperty("pipe_sink_rpc_thrift_compression_enabled"))
+ .orElse(
+ properties.getProperty(
+ "pipe_connector_rpc_thrift_compression_enabled",
+
String.valueOf(config.isPipeConnectorRPCThriftCompressionEnabled())))));
config.setPipeAsyncConnectorSelectorNumber(
Integer.parseInt(
- properties.getProperty(
- "pipe_async_connector_selector_number",
-
String.valueOf(config.getPipeAsyncConnectorSelectorNumber()))));
+
Optional.ofNullable(properties.getProperty("pipe_sink_selector_number"))
+ .orElse(
+ properties.getProperty(
+ "pipe_async_connector_selector_number",
+
String.valueOf(config.getPipeAsyncConnectorSelectorNumber())))));
config.setPipeAsyncConnectorCoreClientNumber(
Integer.parseInt(
- properties.getProperty(
- "pipe_async_connector_core_client_number",
-
String.valueOf(config.getPipeAsyncConnectorCoreClientNumber()))));
+
Optional.ofNullable(properties.getProperty("pipe_sink_core_client_number"))
+ .orElse(
+ properties.getProperty(
+ "pipe_async_connector_core_client_number",
+
String.valueOf(config.getPipeAsyncConnectorCoreClientNumber())))));
config.setPipeAsyncConnectorMaxClientNumber(
Integer.parseInt(
- properties.getProperty(
- "pipe_async_connector_max_client_number",
-
String.valueOf(config.getPipeAsyncConnectorMaxClientNumber()))));
+
Optional.ofNullable(properties.getProperty("pipe_sink_max_client_number"))
+ .orElse(
+ properties.getProperty(
+ "pipe_async_connector_max_client_number",
+
String.valueOf(config.getPipeAsyncConnectorMaxClientNumber())))));
config.setSeperatedPipeHeartbeatEnabled(
Boolean.parseBoolean(