This is an automated email from the ASF dual-hosted git repository. jackietien pushed a commit to branch force_ci/object_type in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 8fe93f2d6228eb02a171e36e37c5c851e0a2309a Author: Zhenyu Luo <[email protected]> AuthorDate: Fri Nov 14 15:06:44 2025 +0800 Pipe: Modify the Pipe configuration item log name to be consistent with the Properties name. (#16732) * Pipe: Modify the Pipe configuration item log name to be consistent with the Properties name. * update (cherry picked from commit a6191d916b2ceca7f3b22e728ae4117b99290261) --- .../thrift/async/IoTDBDataRegionAsyncSink.java | 18 ++---- .../apache/iotdb/commons/conf/CommonConfig.java | 67 +++++++++++----------- .../iotdb/commons/pipe/config/PipeConfig.java | 18 +++--- .../iotdb/commons/pipe/config/PipeDescriptor.java | 16 ++---- 4 files changed, 54 insertions(+), 65 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java index 567715e7292..e6e368a5280 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java @@ -539,14 +539,11 @@ public class IoTDBDataRegionAsyncSink extends IoTDBSink { if ((retryEventQueue.isEmpty() && retryTsFileQueue.isEmpty()) || (!forced && retryEventQueueEventCounter.getTabletInsertionEventCount() - < PipeConfig.getInstance() - .getPipeAsyncConnectorForcedRetryTabletEventQueueSizeThreshold() + < PipeConfig.getInstance().getPipeAsyncSinkForcedRetryTabletEventQueueSize() && retryEventQueueEventCounter.getTsFileInsertionEventCount() - < PipeConfig.getInstance() - .getPipeAsyncConnectorForcedRetryTsFileEventQueueSizeThreshold() + < PipeConfig.getInstance().getPipeAsyncSinkForcedRetryTsFileEventQueueSize() && retryEventQueue.size() + retryTsFileQueue.size() - < PipeConfig.getInstance() - .getPipeAsyncConnectorForcedRetryTotalEventQueueSizeThreshold())) { + < PipeConfig.getInstance().getPipeAsyncSinkForcedRetryTotalEventQueueSize())) { return; } @@ -604,14 +601,11 @@ public class IoTDBDataRegionAsyncSink extends IoTDBSink { if (System.currentTimeMillis() - retryStartTime > PipeConfig.getInstance().getPipeAsyncConnectorMaxRetryExecutionTimeMsPerCall()) { if (retryEventQueueEventCounter.getTabletInsertionEventCount() - < PipeConfig.getInstance() - .getPipeAsyncConnectorForcedRetryTabletEventQueueSizeThreshold() + < PipeConfig.getInstance().getPipeAsyncSinkForcedRetryTabletEventQueueSize() && retryEventQueueEventCounter.getTsFileInsertionEventCount() - < PipeConfig.getInstance() - .getPipeAsyncConnectorForcedRetryTsFileEventQueueSizeThreshold() + < PipeConfig.getInstance().getPipeAsyncSinkForcedRetryTsFileEventQueueSize() && retryEventQueue.size() + retryTsFileQueue.size() - < PipeConfig.getInstance() - .getPipeAsyncConnectorForcedRetryTotalEventQueueSizeThreshold()) { + < PipeConfig.getInstance().getPipeAsyncSinkForcedRetryTotalEventQueueSize()) { return; } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java index 9102a117eca..d83e96eb100 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java @@ -269,9 +269,9 @@ public class CommonConfig { private long pipeConnectorRetryIntervalMs = 1000L; private boolean pipeConnectorRPCThriftCompressionEnabled = false; - private int pipeAsyncConnectorForcedRetryTsFileEventQueueSizeThreshold = 5; - private int pipeAsyncConnectorForcedRetryTabletEventQueueSizeThreshold = 20; - private int pipeAsyncConnectorForcedRetryTotalEventQueueSizeThreshold = 30; + private int pipeAsyncSinkForcedRetryTsFileEventQueueSize = 5; + private int pipeAsyncSinkForcedRetryTabletEventQueueSize = 20; + private int pipeAsyncSinkForcedRetryTotalEventQueueSize = 30; private long pipeAsyncConnectorMaxRetryExecutionTimeMsPerCall = 500; private int pipeAsyncConnectorSelectorNumber = Math.max(4, Runtime.getRuntime().availableProcessors() / 2); @@ -1058,7 +1058,7 @@ public class CommonConfig { } finally { if (fPipeConnectorHandshakeTimeoutMs != this.pipeConnectorHandshakeTimeoutMs) { logger.info( - "pipeConnectorHandshakeTimeoutMs is set to {}.", fPipeConnectorHandshakeTimeoutMs); + "pipeConnectorHandshakeTimeoutMs is set to {}.", this.pipeConnectorHandshakeTimeoutMs); } } } @@ -1127,55 +1127,54 @@ public class CommonConfig { return pipeConnectorRPCThriftCompressionEnabled; } - public void setPipeAsyncConnectorForcedRetryTsFileEventQueueSizeThreshold( - int pipeAsyncConnectorForcedRetryTsFileEventQueueSizeThreshold) { - if (this.pipeAsyncConnectorForcedRetryTsFileEventQueueSizeThreshold - == pipeAsyncConnectorForcedRetryTsFileEventQueueSizeThreshold) { + public void setPipeAsyncSinkForcedRetryTsFileEventQueueSize( + int pipeAsyncSinkForcedRetryTsFileEventQueueSize) { + if (this.pipeAsyncSinkForcedRetryTsFileEventQueueSize + == pipeAsyncSinkForcedRetryTsFileEventQueueSize) { return; } - this.pipeAsyncConnectorForcedRetryTsFileEventQueueSizeThreshold = - pipeAsyncConnectorForcedRetryTsFileEventQueueSizeThreshold; + this.pipeAsyncSinkForcedRetryTsFileEventQueueSize = + pipeAsyncSinkForcedRetryTsFileEventQueueSize; logger.info( - "pipeAsyncConnectorForcedRetryTsFileEventQueueSizeThreshold is set to {}.", - pipeAsyncConnectorForcedRetryTsFileEventQueueSizeThreshold); + "pipeAsyncSinkForcedRetryTsFileEventQueueSize is set to {}.", + pipeAsyncSinkForcedRetryTsFileEventQueueSize); } - public int getPipeAsyncConnectorForcedRetryTsFileEventQueueSizeThreshold() { - return pipeAsyncConnectorForcedRetryTsFileEventQueueSizeThreshold; + public int getPipeAsyncSinkForcedRetryTsFileEventQueueSize() { + return pipeAsyncSinkForcedRetryTsFileEventQueueSize; } - public void setPipeAsyncConnectorForcedRetryTabletEventQueueSizeThreshold( - int pipeAsyncConnectorForcedRetryTabletEventQueueSizeThreshold) { - if (this.pipeAsyncConnectorForcedRetryTabletEventQueueSizeThreshold - == pipeAsyncConnectorForcedRetryTabletEventQueueSizeThreshold) { + public void setPipeAsyncSinkForcedRetryTabletEventQueueSize( + int pipeAsyncSinkForcedRetryTabletEventQueueSize) { + if (this.pipeAsyncSinkForcedRetryTabletEventQueueSize + == pipeAsyncSinkForcedRetryTabletEventQueueSize) { return; } - this.pipeAsyncConnectorForcedRetryTabletEventQueueSizeThreshold = - pipeAsyncConnectorForcedRetryTabletEventQueueSizeThreshold; + this.pipeAsyncSinkForcedRetryTabletEventQueueSize = + pipeAsyncSinkForcedRetryTabletEventQueueSize; logger.info( - "pipeAsyncConnectorForcedRetryTabletEventQueueSizeThreshold is set to {}.", - pipeAsyncConnectorForcedRetryTabletEventQueueSizeThreshold); + "pipeAsyncSinkForcedRetryTabletEventQueueSize is set to {}.", + pipeAsyncSinkForcedRetryTabletEventQueueSize); } - public int getPipeAsyncConnectorForcedRetryTabletEventQueueSizeThreshold() { - return pipeAsyncConnectorForcedRetryTabletEventQueueSizeThreshold; + public int getPipeAsyncSinkForcedRetryTabletEventQueueSize() { + return pipeAsyncSinkForcedRetryTabletEventQueueSize; } - public void setPipeAsyncConnectorForcedRetryTotalEventQueueSizeThreshold( - int pipeAsyncConnectorForcedRetryTotalEventQueueSizeThreshold) { - if (this.pipeAsyncConnectorForcedRetryTotalEventQueueSizeThreshold - == pipeAsyncConnectorForcedRetryTotalEventQueueSizeThreshold) { + public void setPipeAsyncSinkForcedRetryTotalEventQueueSize( + int pipeAsyncSinkForcedRetryTotalEventQueueSize) { + if (this.pipeAsyncSinkForcedRetryTotalEventQueueSize + == pipeAsyncSinkForcedRetryTotalEventQueueSize) { return; } - this.pipeAsyncConnectorForcedRetryTotalEventQueueSizeThreshold = - pipeAsyncConnectorForcedRetryTotalEventQueueSizeThreshold; + this.pipeAsyncSinkForcedRetryTotalEventQueueSize = pipeAsyncSinkForcedRetryTotalEventQueueSize; logger.info( - "pipeAsyncConnectorForcedRetryTotalEventQueueSizeThreshold is set to {}.", - pipeAsyncConnectorForcedRetryTotalEventQueueSizeThreshold); + "pipeAsyncSinkForcedRetryTotalEventQueueSize is set to {}.", + pipeAsyncSinkForcedRetryTotalEventQueueSize); } - public int getPipeAsyncConnectorForcedRetryTotalEventQueueSizeThreshold() { - return pipeAsyncConnectorForcedRetryTotalEventQueueSizeThreshold; + public int getPipeAsyncSinkForcedRetryTotalEventQueueSize() { + return pipeAsyncSinkForcedRetryTotalEventQueueSize; } public void setPipeAsyncConnectorMaxRetryExecutionTimeMsPerCall( diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java index 637628149b6..ce09fb1f291 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java @@ -183,16 +183,16 @@ public class PipeConfig { return COMMON_CONFIG.isPipeConnectorRPCThriftCompressionEnabled(); } - public int getPipeAsyncConnectorForcedRetryTsFileEventQueueSizeThreshold() { - return COMMON_CONFIG.getPipeAsyncConnectorForcedRetryTsFileEventQueueSizeThreshold(); + public int getPipeAsyncSinkForcedRetryTsFileEventQueueSize() { + return COMMON_CONFIG.getPipeAsyncSinkForcedRetryTsFileEventQueueSize(); } - public int getPipeAsyncConnectorForcedRetryTabletEventQueueSizeThreshold() { - return COMMON_CONFIG.getPipeAsyncConnectorForcedRetryTabletEventQueueSizeThreshold(); + public int getPipeAsyncSinkForcedRetryTabletEventQueueSize() { + return COMMON_CONFIG.getPipeAsyncSinkForcedRetryTabletEventQueueSize(); } - public int getPipeAsyncConnectorForcedRetryTotalEventQueueSizeThreshold() { - return COMMON_CONFIG.getPipeAsyncConnectorForcedRetryTotalEventQueueSizeThreshold(); + public int getPipeAsyncSinkForcedRetryTotalEventQueueSize() { + return COMMON_CONFIG.getPipeAsyncSinkForcedRetryTotalEventQueueSize(); } public long getPipeAsyncConnectorMaxRetryExecutionTimeMsPerCall() { @@ -541,13 +541,13 @@ public class PipeConfig { LOGGER.info( "PipeAsyncConnectorForcedRetryTsFileEventQueueSizeThreshold: {}", - getPipeAsyncConnectorForcedRetryTsFileEventQueueSizeThreshold()); + getPipeAsyncSinkForcedRetryTsFileEventQueueSize()); LOGGER.info( "PipeAsyncConnectorForcedRetryTabletEventQueueSizeThreshold: {}", - getPipeAsyncConnectorForcedRetryTabletEventQueueSizeThreshold()); + getPipeAsyncSinkForcedRetryTabletEventQueueSize()); LOGGER.info( "PipeAsyncConnectorForcedRetryTotalEventQueueSizeThreshold: {}", - getPipeAsyncConnectorForcedRetryTotalEventQueueSizeThreshold()); + getPipeAsyncSinkForcedRetryTotalEventQueueSize()); LOGGER.info( "PipeAsyncConnectorMaxRetryExecutionTimeMsPerCall: {}", getPipeAsyncConnectorMaxRetryExecutionTimeMsPerCall()); diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java index 77aae8a3252..928ff5f25a5 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java @@ -368,7 +368,7 @@ public class PipeDescriptor { "pipe_async_connector_max_retry_execution_time_ms_per_call", String.valueOf( config.getPipeAsyncConnectorMaxRetryExecutionTimeMsPerCall()))))); - config.setPipeAsyncConnectorForcedRetryTsFileEventQueueSizeThreshold( + config.setPipeAsyncSinkForcedRetryTsFileEventQueueSize( Integer.parseInt( Optional.ofNullable( properties.getProperty("pipe_async_sink_forced_retry_tsfile_event_queue_size")) @@ -376,9 +376,8 @@ public class PipeDescriptor { properties.getProperty( "pipe_async_connector_forced_retry_tsfile_event_queue_size", String.valueOf( - config - .getPipeAsyncConnectorForcedRetryTsFileEventQueueSizeThreshold()))))); - config.setPipeAsyncConnectorForcedRetryTabletEventQueueSizeThreshold( + config.getPipeAsyncSinkForcedRetryTsFileEventQueueSize()))))); + config.setPipeAsyncSinkForcedRetryTabletEventQueueSize( Integer.parseInt( Optional.ofNullable( properties.getProperty("pipe_async_sink_forced_retry_tablet_event_queue_size")) @@ -386,18 +385,15 @@ public class PipeDescriptor { properties.getProperty( "pipe_async_connector_forced_retry_tablet_event_queue_size", String.valueOf( - config - .getPipeAsyncConnectorForcedRetryTabletEventQueueSizeThreshold()))))); - config.setPipeAsyncConnectorForcedRetryTotalEventQueueSizeThreshold( + config.getPipeAsyncSinkForcedRetryTabletEventQueueSize()))))); + config.setPipeAsyncSinkForcedRetryTotalEventQueueSize( Integer.parseInt( Optional.ofNullable( properties.getProperty("pipe_async_sink_forced_retry_total_event_queue_size")) .orElse( properties.getProperty( "pipe_async_connector_forced_retry_total_event_queue_size", - String.valueOf( - config - .getPipeAsyncConnectorForcedRetryTotalEventQueueSizeThreshold()))))); + String.valueOf(config.getPipeAsyncSinkForcedRetryTotalEventQueueSize()))))); config.setRateLimiterHotReloadCheckIntervalMs( Integer.parseInt( properties.getProperty(
