This is an automated email from the ASF dual-hosted git repository.
justinchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new a6191d916b2 Pipe: Modify the Pipe configuration item log name to be
consistent with the Properties name. (#16732)
a6191d916b2 is described below
commit a6191d916b2ceca7f3b22e728ae4117b99290261
Author: Zhenyu Luo <[email protected]>
AuthorDate: Fri Nov 14 15:06:44 2025 +0800
Pipe: Modify the Pipe configuration item log name to be consistent with the
Properties name. (#16732)
* Pipe: Modify the Pipe configuration item log name to be consistent with
the Properties name.
* update
---
.../thrift/async/IoTDBDataRegionAsyncSink.java | 18 ++----
.../apache/iotdb/commons/conf/CommonConfig.java | 67 +++++++++++-----------
.../iotdb/commons/pipe/config/PipeConfig.java | 18 +++---
.../iotdb/commons/pipe/config/PipeDescriptor.java | 16 ++----
4 files changed, 54 insertions(+), 65 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java
index 567715e7292..e6e368a5280 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java
@@ -539,14 +539,11 @@ public class IoTDBDataRegionAsyncSink extends IoTDBSink {
if ((retryEventQueue.isEmpty() && retryTsFileQueue.isEmpty())
|| (!forced
&& retryEventQueueEventCounter.getTabletInsertionEventCount()
- < PipeConfig.getInstance()
-
.getPipeAsyncConnectorForcedRetryTabletEventQueueSizeThreshold()
+ <
PipeConfig.getInstance().getPipeAsyncSinkForcedRetryTabletEventQueueSize()
&& retryEventQueueEventCounter.getTsFileInsertionEventCount()
- < PipeConfig.getInstance()
-
.getPipeAsyncConnectorForcedRetryTsFileEventQueueSizeThreshold()
+ <
PipeConfig.getInstance().getPipeAsyncSinkForcedRetryTsFileEventQueueSize()
&& retryEventQueue.size() + retryTsFileQueue.size()
- < PipeConfig.getInstance()
-
.getPipeAsyncConnectorForcedRetryTotalEventQueueSizeThreshold())) {
+ <
PipeConfig.getInstance().getPipeAsyncSinkForcedRetryTotalEventQueueSize())) {
return;
}
@@ -604,14 +601,11 @@ public class IoTDBDataRegionAsyncSink extends IoTDBSink {
if (System.currentTimeMillis() - retryStartTime
>
PipeConfig.getInstance().getPipeAsyncConnectorMaxRetryExecutionTimeMsPerCall())
{
if (retryEventQueueEventCounter.getTabletInsertionEventCount()
- < PipeConfig.getInstance()
-
.getPipeAsyncConnectorForcedRetryTabletEventQueueSizeThreshold()
+ <
PipeConfig.getInstance().getPipeAsyncSinkForcedRetryTabletEventQueueSize()
&& retryEventQueueEventCounter.getTsFileInsertionEventCount()
- < PipeConfig.getInstance()
-
.getPipeAsyncConnectorForcedRetryTsFileEventQueueSizeThreshold()
+ <
PipeConfig.getInstance().getPipeAsyncSinkForcedRetryTsFileEventQueueSize()
&& retryEventQueue.size() + retryTsFileQueue.size()
- < PipeConfig.getInstance()
-
.getPipeAsyncConnectorForcedRetryTotalEventQueueSizeThreshold()) {
+ <
PipeConfig.getInstance().getPipeAsyncSinkForcedRetryTotalEventQueueSize()) {
return;
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
index 9102a117eca..d83e96eb100 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
@@ -269,9 +269,9 @@ public class CommonConfig {
private long pipeConnectorRetryIntervalMs = 1000L;
private boolean pipeConnectorRPCThriftCompressionEnabled = false;
- private int pipeAsyncConnectorForcedRetryTsFileEventQueueSizeThreshold = 5;
- private int pipeAsyncConnectorForcedRetryTabletEventQueueSizeThreshold = 20;
- private int pipeAsyncConnectorForcedRetryTotalEventQueueSizeThreshold = 30;
+ private int pipeAsyncSinkForcedRetryTsFileEventQueueSize = 5;
+ private int pipeAsyncSinkForcedRetryTabletEventQueueSize = 20;
+ private int pipeAsyncSinkForcedRetryTotalEventQueueSize = 30;
private long pipeAsyncConnectorMaxRetryExecutionTimeMsPerCall = 500;
private int pipeAsyncConnectorSelectorNumber =
Math.max(4, Runtime.getRuntime().availableProcessors() / 2);
@@ -1058,7 +1058,7 @@ public class CommonConfig {
} finally {
if (fPipeConnectorHandshakeTimeoutMs !=
this.pipeConnectorHandshakeTimeoutMs) {
logger.info(
- "pipeConnectorHandshakeTimeoutMs is set to {}.",
fPipeConnectorHandshakeTimeoutMs);
+ "pipeConnectorHandshakeTimeoutMs is set to {}.",
this.pipeConnectorHandshakeTimeoutMs);
}
}
}
@@ -1127,55 +1127,54 @@ public class CommonConfig {
return pipeConnectorRPCThriftCompressionEnabled;
}
- public void setPipeAsyncConnectorForcedRetryTsFileEventQueueSizeThreshold(
- int pipeAsyncConnectorForcedRetryTsFileEventQueueSizeThreshold) {
- if (this.pipeAsyncConnectorForcedRetryTsFileEventQueueSizeThreshold
- == pipeAsyncConnectorForcedRetryTsFileEventQueueSizeThreshold) {
+ public void setPipeAsyncSinkForcedRetryTsFileEventQueueSize(
+ int pipeAsyncSinkForcedRetryTsFileEventQueueSize) {
+ if (this.pipeAsyncSinkForcedRetryTsFileEventQueueSize
+ == pipeAsyncSinkForcedRetryTsFileEventQueueSize) {
return;
}
- this.pipeAsyncConnectorForcedRetryTsFileEventQueueSizeThreshold =
- pipeAsyncConnectorForcedRetryTsFileEventQueueSizeThreshold;
+ this.pipeAsyncSinkForcedRetryTsFileEventQueueSize =
+ pipeAsyncSinkForcedRetryTsFileEventQueueSize;
logger.info(
- "pipeAsyncConnectorForcedRetryTsFileEventQueueSizeThreshold is set to
{}.",
- pipeAsyncConnectorForcedRetryTsFileEventQueueSizeThreshold);
+ "pipeAsyncSinkForcedRetryTsFileEventQueueSize is set to {}.",
+ pipeAsyncSinkForcedRetryTsFileEventQueueSize);
}
- public int getPipeAsyncConnectorForcedRetryTsFileEventQueueSizeThreshold() {
- return pipeAsyncConnectorForcedRetryTsFileEventQueueSizeThreshold;
+ public int getPipeAsyncSinkForcedRetryTsFileEventQueueSize() {
+ return pipeAsyncSinkForcedRetryTsFileEventQueueSize;
}
- public void setPipeAsyncConnectorForcedRetryTabletEventQueueSizeThreshold(
- int pipeAsyncConnectorForcedRetryTabletEventQueueSizeThreshold) {
- if (this.pipeAsyncConnectorForcedRetryTabletEventQueueSizeThreshold
- == pipeAsyncConnectorForcedRetryTabletEventQueueSizeThreshold) {
+ public void setPipeAsyncSinkForcedRetryTabletEventQueueSize(
+ int pipeAsyncSinkForcedRetryTabletEventQueueSize) {
+ if (this.pipeAsyncSinkForcedRetryTabletEventQueueSize
+ == pipeAsyncSinkForcedRetryTabletEventQueueSize) {
return;
}
- this.pipeAsyncConnectorForcedRetryTabletEventQueueSizeThreshold =
- pipeAsyncConnectorForcedRetryTabletEventQueueSizeThreshold;
+ this.pipeAsyncSinkForcedRetryTabletEventQueueSize =
+ pipeAsyncSinkForcedRetryTabletEventQueueSize;
logger.info(
- "pipeAsyncConnectorForcedRetryTabletEventQueueSizeThreshold is set to
{}.",
- pipeAsyncConnectorForcedRetryTabletEventQueueSizeThreshold);
+ "pipeAsyncSinkForcedRetryTabletEventQueueSize is set to {}.",
+ pipeAsyncSinkForcedRetryTabletEventQueueSize);
}
- public int getPipeAsyncConnectorForcedRetryTabletEventQueueSizeThreshold() {
- return pipeAsyncConnectorForcedRetryTabletEventQueueSizeThreshold;
+ public int getPipeAsyncSinkForcedRetryTabletEventQueueSize() {
+ return pipeAsyncSinkForcedRetryTabletEventQueueSize;
}
- public void setPipeAsyncConnectorForcedRetryTotalEventQueueSizeThreshold(
- int pipeAsyncConnectorForcedRetryTotalEventQueueSizeThreshold) {
- if (this.pipeAsyncConnectorForcedRetryTotalEventQueueSizeThreshold
- == pipeAsyncConnectorForcedRetryTotalEventQueueSizeThreshold) {
+ public void setPipeAsyncSinkForcedRetryTotalEventQueueSize(
+ int pipeAsyncSinkForcedRetryTotalEventQueueSize) {
+ if (this.pipeAsyncSinkForcedRetryTotalEventQueueSize
+ == pipeAsyncSinkForcedRetryTotalEventQueueSize) {
return;
}
- this.pipeAsyncConnectorForcedRetryTotalEventQueueSizeThreshold =
- pipeAsyncConnectorForcedRetryTotalEventQueueSizeThreshold;
+ this.pipeAsyncSinkForcedRetryTotalEventQueueSize =
pipeAsyncSinkForcedRetryTotalEventQueueSize;
logger.info(
- "pipeAsyncConnectorForcedRetryTotalEventQueueSizeThreshold is set to
{}.",
- pipeAsyncConnectorForcedRetryTotalEventQueueSizeThreshold);
+ "pipeAsyncSinkForcedRetryTotalEventQueueSize is set to {}.",
+ pipeAsyncSinkForcedRetryTotalEventQueueSize);
}
- public int getPipeAsyncConnectorForcedRetryTotalEventQueueSizeThreshold() {
- return pipeAsyncConnectorForcedRetryTotalEventQueueSizeThreshold;
+ public int getPipeAsyncSinkForcedRetryTotalEventQueueSize() {
+ return pipeAsyncSinkForcedRetryTotalEventQueueSize;
}
public void setPipeAsyncConnectorMaxRetryExecutionTimeMsPerCall(
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
index 637628149b6..ce09fb1f291 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
@@ -183,16 +183,16 @@ public class PipeConfig {
return COMMON_CONFIG.isPipeConnectorRPCThriftCompressionEnabled();
}
- public int getPipeAsyncConnectorForcedRetryTsFileEventQueueSizeThreshold() {
- return
COMMON_CONFIG.getPipeAsyncConnectorForcedRetryTsFileEventQueueSizeThreshold();
+ public int getPipeAsyncSinkForcedRetryTsFileEventQueueSize() {
+ return COMMON_CONFIG.getPipeAsyncSinkForcedRetryTsFileEventQueueSize();
}
- public int getPipeAsyncConnectorForcedRetryTabletEventQueueSizeThreshold() {
- return
COMMON_CONFIG.getPipeAsyncConnectorForcedRetryTabletEventQueueSizeThreshold();
+ public int getPipeAsyncSinkForcedRetryTabletEventQueueSize() {
+ return COMMON_CONFIG.getPipeAsyncSinkForcedRetryTabletEventQueueSize();
}
- public int getPipeAsyncConnectorForcedRetryTotalEventQueueSizeThreshold() {
- return
COMMON_CONFIG.getPipeAsyncConnectorForcedRetryTotalEventQueueSizeThreshold();
+ public int getPipeAsyncSinkForcedRetryTotalEventQueueSize() {
+ return COMMON_CONFIG.getPipeAsyncSinkForcedRetryTotalEventQueueSize();
}
public long getPipeAsyncConnectorMaxRetryExecutionTimeMsPerCall() {
@@ -541,13 +541,13 @@ public class PipeConfig {
LOGGER.info(
"PipeAsyncConnectorForcedRetryTsFileEventQueueSizeThreshold: {}",
- getPipeAsyncConnectorForcedRetryTsFileEventQueueSizeThreshold());
+ getPipeAsyncSinkForcedRetryTsFileEventQueueSize());
LOGGER.info(
"PipeAsyncConnectorForcedRetryTabletEventQueueSizeThreshold: {}",
- getPipeAsyncConnectorForcedRetryTabletEventQueueSizeThreshold());
+ getPipeAsyncSinkForcedRetryTabletEventQueueSize());
LOGGER.info(
"PipeAsyncConnectorForcedRetryTotalEventQueueSizeThreshold: {}",
- getPipeAsyncConnectorForcedRetryTotalEventQueueSizeThreshold());
+ getPipeAsyncSinkForcedRetryTotalEventQueueSize());
LOGGER.info(
"PipeAsyncConnectorMaxRetryExecutionTimeMsPerCall: {}",
getPipeAsyncConnectorMaxRetryExecutionTimeMsPerCall());
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
index 77aae8a3252..928ff5f25a5 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
@@ -368,7 +368,7 @@ public class PipeDescriptor {
"pipe_async_connector_max_retry_execution_time_ms_per_call",
String.valueOf(
config.getPipeAsyncConnectorMaxRetryExecutionTimeMsPerCall())))));
- config.setPipeAsyncConnectorForcedRetryTsFileEventQueueSizeThreshold(
+ config.setPipeAsyncSinkForcedRetryTsFileEventQueueSize(
Integer.parseInt(
Optional.ofNullable(
properties.getProperty("pipe_async_sink_forced_retry_tsfile_event_queue_size"))
@@ -376,9 +376,8 @@ public class PipeDescriptor {
properties.getProperty(
"pipe_async_connector_forced_retry_tsfile_event_queue_size",
String.valueOf(
- config
-
.getPipeAsyncConnectorForcedRetryTsFileEventQueueSizeThreshold())))));
- config.setPipeAsyncConnectorForcedRetryTabletEventQueueSizeThreshold(
+
config.getPipeAsyncSinkForcedRetryTsFileEventQueueSize())))));
+ config.setPipeAsyncSinkForcedRetryTabletEventQueueSize(
Integer.parseInt(
Optional.ofNullable(
properties.getProperty("pipe_async_sink_forced_retry_tablet_event_queue_size"))
@@ -386,18 +385,15 @@ public class PipeDescriptor {
properties.getProperty(
"pipe_async_connector_forced_retry_tablet_event_queue_size",
String.valueOf(
- config
-
.getPipeAsyncConnectorForcedRetryTabletEventQueueSizeThreshold())))));
- config.setPipeAsyncConnectorForcedRetryTotalEventQueueSizeThreshold(
+
config.getPipeAsyncSinkForcedRetryTabletEventQueueSize())))));
+ config.setPipeAsyncSinkForcedRetryTotalEventQueueSize(
Integer.parseInt(
Optional.ofNullable(
properties.getProperty("pipe_async_sink_forced_retry_total_event_queue_size"))
.orElse(
properties.getProperty(
"pipe_async_connector_forced_retry_total_event_queue_size",
- String.valueOf(
- config
-
.getPipeAsyncConnectorForcedRetryTotalEventQueueSizeThreshold())))));
+
String.valueOf(config.getPipeAsyncSinkForcedRetryTotalEventQueueSize())))));
config.setRateLimiterHotReloadCheckIntervalMs(
Integer.parseInt(
properties.getProperty(