This is an automated email from the ASF dual-hosted git repository.
justinchen pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/dev/1.3 by this push:
new 403cf64f911 [To dev/1.3] Pipe: Modify the Pipe configuration item log
name to be consistent with the Properties name. (#16732) (#16759)
403cf64f911 is described below
commit 403cf64f9119d9f5c13dfece52596df43ab448aa
Author: Zhenyu Luo <[email protected]>
AuthorDate: Mon Nov 17 23:00:43 2025 +0800
[To dev/1.3] Pipe: Modify the Pipe configuration item log name to be
consistent with the Properties name. (#16732) (#16759)
* Pipe: Modify the Pipe configuration item log name to be consistent with
the Properties name. (#16732)
* Pipe: Modify the Pipe configuration item log name to be consistent with
the Properties name.
* update
(cherry picked from commit a6191d916b2ceca7f3b22e728ae4117b99290261)
* update
* update
---
.../iotdb/tool/tsfile/ImportTsFileRemotely.java | 3 +-
.../exchange/sender/TwoStageAggregateSender.java | 2 +-
.../client/IoTDBDataNodeAsyncClientManager.java | 4 +-
.../thrift/async/IoTDBDataRegionAsyncSink.java | 18 ++---
.../apache/iotdb/commons/conf/CommonConfig.java | 84 +++++++++++-----------
.../iotdb/commons/pipe/config/PipeConfig.java | 31 ++++----
.../iotdb/commons/pipe/config/PipeDescriptor.java | 20 +++---
.../pipe/sink/client/IoTDBSyncClientManager.java | 2 +-
8 files changed, 75 insertions(+), 89 deletions(-)
diff --git
a/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/tsfile/ImportTsFileRemotely.java
b/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/tsfile/ImportTsFileRemotely.java
index d842b9803af..855c1678a84 100644
---
a/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/tsfile/ImportTsFileRemotely.java
+++
b/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/tsfile/ImportTsFileRemotely.java
@@ -295,8 +295,7 @@ public class ImportTsFileRemotely extends ImportTsFileBase {
this.client =
new IoTDBSyncClient(
new ThriftClientProperty.Builder()
- .setConnectionTimeoutMs(
-
PipeConfig.getInstance().getPipeConnectorHandshakeTimeoutMs())
+
.setConnectionTimeoutMs(PipeConfig.getInstance().getPipeSinkHandshakeTimeoutMs())
.setRpcThriftCompressionEnabled(
PipeConfig.getInstance().isPipeConnectorRPCThriftCompressionEnabled())
.build(),
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/exchange/sender/TwoStageAggregateSender.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/exchange/sender/TwoStageAggregateSender.java
index bac357368c0..45d6e45d25c 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/exchange/sender/TwoStageAggregateSender.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/exchange/sender/TwoStageAggregateSender.java
@@ -208,7 +208,7 @@ public class TwoStageAggregateSender implements
AutoCloseable {
private IoTDBSyncClient constructIoTDBSyncClient(TEndPoint endPoint) throws
TTransportException {
return new IoTDBSyncClient(
new ThriftClientProperty.Builder()
-
.setConnectionTimeoutMs(PIPE_CONFIG.getPipeConnectorHandshakeTimeoutMs())
+
.setConnectionTimeoutMs(PIPE_CONFIG.getPipeSinkHandshakeTimeoutMs())
.setRpcThriftCompressionEnabled(
PIPE_CONFIG.isPipeConnectorRPCThriftCompressionEnabled())
.build(),
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/client/IoTDBDataNodeAsyncClientManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/client/IoTDBDataNodeAsyncClientManager.java
index 5901e488a58..97a4d2621b0 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/client/IoTDBDataNodeAsyncClientManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/client/IoTDBDataNodeAsyncClientManager.java
@@ -300,7 +300,7 @@ public class IoTDBDataNodeAsyncClientManager extends
IoTDBClientManager
PipeTransferHandshakeConstant.HANDSHAKE_KEY_MARK_AS_PIPE_REQUEST,
Boolean.toString(shouldMarkAsPipeRequest));
-
client.setTimeoutDynamically(PipeConfig.getInstance().getPipeConnectorHandshakeTimeoutMs());
+
client.setTimeoutDynamically(PipeConfig.getInstance().getPipeSinkHandshakeTimeoutMs());
client.pipeTransfer(PipeTransferDataNodeHandshakeV2Req.toTPipeTransferReq(params),
callback);
waitHandshakeFinished(isHandshakeFinished);
@@ -319,7 +319,7 @@ public class IoTDBDataNodeAsyncClientManager extends
IoTDBClientManager
resp.set(null);
exception.set(null);
-
client.setTimeoutDynamically(PipeConfig.getInstance().getPipeConnectorHandshakeTimeoutMs());
+
client.setTimeoutDynamically(PipeConfig.getInstance().getPipeSinkHandshakeTimeoutMs());
client.pipeTransfer(
PipeTransferDataNodeHandshakeV1Req.toTPipeTransferReq(
CommonDescriptor.getInstance().getConfig().getTimestampPrecision()),
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java
index 2b57d94bfc3..e8eca26293c 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java
@@ -519,14 +519,11 @@ public class IoTDBDataRegionAsyncSink extends IoTDBSink {
if ((retryEventQueue.isEmpty() && retryTsFileQueue.isEmpty())
|| (!forced
&& retryEventQueueEventCounter.getTabletInsertionEventCount()
- < PipeConfig.getInstance()
-
.getPipeAsyncConnectorForcedRetryTabletEventQueueSizeThreshold()
+ <
PipeConfig.getInstance().getPipeAsyncSinkForcedRetryTabletEventQueueSize()
&& retryEventQueueEventCounter.getTsFileInsertionEventCount()
- < PipeConfig.getInstance()
-
.getPipeAsyncConnectorForcedRetryTsFileEventQueueSizeThreshold()
+ <
PipeConfig.getInstance().getPipeAsyncSinkForcedRetryTsFileEventQueueSize()
&& retryEventQueue.size() + retryTsFileQueue.size()
- < PipeConfig.getInstance()
-
.getPipeAsyncConnectorForcedRetryTotalEventQueueSizeThreshold())) {
+ <
PipeConfig.getInstance().getPipeAsyncSinkForcedRetryTotalEventQueueSize())) {
return;
}
@@ -584,14 +581,11 @@ public class IoTDBDataRegionAsyncSink extends IoTDBSink {
if (System.currentTimeMillis() - retryStartTime
>
PipeConfig.getInstance().getPipeAsyncConnectorMaxRetryExecutionTimeMsPerCall())
{
if (retryEventQueueEventCounter.getTabletInsertionEventCount()
- < PipeConfig.getInstance()
-
.getPipeAsyncConnectorForcedRetryTabletEventQueueSizeThreshold()
+ <
PipeConfig.getInstance().getPipeAsyncSinkForcedRetryTabletEventQueueSize()
&& retryEventQueueEventCounter.getTsFileInsertionEventCount()
- < PipeConfig.getInstance()
-
.getPipeAsyncConnectorForcedRetryTsFileEventQueueSizeThreshold()
+ <
PipeConfig.getInstance().getPipeAsyncSinkForcedRetryTsFileEventQueueSize()
&& retryEventQueue.size() + retryTsFileQueue.size()
- < PipeConfig.getInstance()
-
.getPipeAsyncConnectorForcedRetryTotalEventQueueSizeThreshold()) {
+ <
PipeConfig.getInstance().getPipeAsyncSinkForcedRetryTotalEventQueueSize()) {
return;
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
index 091fb56def2..9951fe217a8 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
@@ -259,16 +259,16 @@ public class CommonConfig {
private long pipeSourceAssignerDisruptorRingBufferEntrySizeInBytes = 72 * KB;
private long pipeSourceMatcherCacheSize = 1024;
- private int pipeConnectorHandshakeTimeoutMs = 10 * 1000; // 10 seconds
+ private int pipeSinkHandshakeTimeoutMs = 10 * 1000; // 10 seconds
private int pipeConnectorTransferTimeoutMs = 15 * 60 * 1000; // 15 minutes
private int pipeConnectorReadFileBufferSize = 5242880; // 5MB
private boolean isPipeConnectorReadFileBufferMemoryControlEnabled = false;
private long pipeConnectorRetryIntervalMs = 1000L;
private boolean pipeConnectorRPCThriftCompressionEnabled = false;
- private int pipeAsyncConnectorForcedRetryTsFileEventQueueSizeThreshold = 5;
- private int pipeAsyncConnectorForcedRetryTabletEventQueueSizeThreshold = 20;
- private int pipeAsyncConnectorForcedRetryTotalEventQueueSizeThreshold = 30;
+ private int pipeAsyncSinkForcedRetryTsFileEventQueueSize = 5;
+ private int pipeAsyncSinkForcedRetryTabletEventQueueSize = 20;
+ private int pipeAsyncSinkForcedRetryTotalEventQueueSize = 30;
private long pipeAsyncConnectorMaxRetryExecutionTimeMsPerCall = 500;
private int pipeAsyncConnectorSelectorNumber =
Math.max(4, Runtime.getRuntime().availableProcessors() / 2);
@@ -988,22 +988,21 @@ public class CommonConfig {
logger.info("pipeExtractorMatcherCacheSize is set to {}.",
pipeSourceMatcherCacheSize);
}
- public int getPipeConnectorHandshakeTimeoutMs() {
- return pipeConnectorHandshakeTimeoutMs;
+ public int getPipeSinkHandshakeTimeoutMs() {
+ return pipeSinkHandshakeTimeoutMs;
}
- public void setPipeConnectorHandshakeTimeoutMs(long
pipeConnectorHandshakeTimeoutMs) {
- final int fPipeConnectorHandshakeTimeoutMs =
this.pipeConnectorHandshakeTimeoutMs;
+ public void setPipeSinkHandshakeTimeoutMs(long pipeSinkHandshakeTimeoutMs) {
+ final int fPipeConnectorHandshakeTimeoutMs =
this.pipeSinkHandshakeTimeoutMs;
try {
- this.pipeConnectorHandshakeTimeoutMs =
Math.toIntExact(pipeConnectorHandshakeTimeoutMs);
+ this.pipeSinkHandshakeTimeoutMs =
Math.toIntExact(pipeSinkHandshakeTimeoutMs);
} catch (ArithmeticException e) {
- this.pipeConnectorHandshakeTimeoutMs = Integer.MAX_VALUE;
+ this.pipeSinkHandshakeTimeoutMs = Integer.MAX_VALUE;
logger.warn(
"Given pipe connector handshake timeout is too large, set to {}
ms.", Integer.MAX_VALUE);
} finally {
- if (fPipeConnectorHandshakeTimeoutMs !=
this.pipeConnectorHandshakeTimeoutMs) {
- logger.info(
- "pipeConnectorHandshakeTimeoutMs is set to {}.",
fPipeConnectorHandshakeTimeoutMs);
+ if (fPipeConnectorHandshakeTimeoutMs != this.pipeSinkHandshakeTimeoutMs)
{
+ logger.info("pipeSinkHandshakeTimeoutMs is set to {}.",
this.pipeSinkHandshakeTimeoutMs);
}
}
}
@@ -1072,55 +1071,54 @@ public class CommonConfig {
return pipeConnectorRPCThriftCompressionEnabled;
}
- public void setPipeAsyncConnectorForcedRetryTsFileEventQueueSizeThreshold(
- int pipeAsyncConnectorForcedRetryTsFileEventQueueSizeThreshold) {
- if (this.pipeAsyncConnectorForcedRetryTsFileEventQueueSizeThreshold
- == pipeAsyncConnectorForcedRetryTsFileEventQueueSizeThreshold) {
+ public void setPipeAsyncSinkForcedRetryTsFileEventQueueSize(
+ int pipeAsyncSinkForcedRetryTsFileEventQueueSize) {
+ if (this.pipeAsyncSinkForcedRetryTsFileEventQueueSize
+ == pipeAsyncSinkForcedRetryTsFileEventQueueSize) {
return;
}
- this.pipeAsyncConnectorForcedRetryTsFileEventQueueSizeThreshold =
- pipeAsyncConnectorForcedRetryTsFileEventQueueSizeThreshold;
+ this.pipeAsyncSinkForcedRetryTsFileEventQueueSize =
+ pipeAsyncSinkForcedRetryTsFileEventQueueSize;
logger.info(
- "pipeAsyncConnectorForcedRetryTsFileEventQueueSizeThreshold is set to
{}.",
- pipeAsyncConnectorForcedRetryTsFileEventQueueSizeThreshold);
+ "pipeAsyncSinkForcedRetryTsFileEventQueueSize is set to {}.",
+ pipeAsyncSinkForcedRetryTsFileEventQueueSize);
}
- public int getPipeAsyncConnectorForcedRetryTsFileEventQueueSizeThreshold() {
- return pipeAsyncConnectorForcedRetryTsFileEventQueueSizeThreshold;
+ public int getPipeAsyncSinkForcedRetryTsFileEventQueueSize() {
+ return pipeAsyncSinkForcedRetryTsFileEventQueueSize;
}
- public void setPipeAsyncConnectorForcedRetryTabletEventQueueSizeThreshold(
- int pipeAsyncConnectorForcedRetryTabletEventQueueSizeThreshold) {
- if (this.pipeAsyncConnectorForcedRetryTabletEventQueueSizeThreshold
- == pipeAsyncConnectorForcedRetryTabletEventQueueSizeThreshold) {
+ public void setPipeAsyncSinkForcedRetryTabletEventQueueSize(
+ int pipeAsyncSinkForcedRetryTabletEventQueueSize) {
+ if (this.pipeAsyncSinkForcedRetryTabletEventQueueSize
+ == pipeAsyncSinkForcedRetryTabletEventQueueSize) {
return;
}
- this.pipeAsyncConnectorForcedRetryTabletEventQueueSizeThreshold =
- pipeAsyncConnectorForcedRetryTabletEventQueueSizeThreshold;
+ this.pipeAsyncSinkForcedRetryTabletEventQueueSize =
+ pipeAsyncSinkForcedRetryTabletEventQueueSize;
logger.info(
- "pipeAsyncConnectorForcedRetryTabletEventQueueSizeThreshold is set to
{}.",
- pipeAsyncConnectorForcedRetryTabletEventQueueSizeThreshold);
+ "pipeAsyncSinkForcedRetryTabletEventQueueSize is set to {}.",
+ pipeAsyncSinkForcedRetryTabletEventQueueSize);
}
- public int getPipeAsyncConnectorForcedRetryTabletEventQueueSizeThreshold() {
- return pipeAsyncConnectorForcedRetryTabletEventQueueSizeThreshold;
+ public int getPipeAsyncSinkForcedRetryTabletEventQueueSize() {
+ return pipeAsyncSinkForcedRetryTabletEventQueueSize;
}
- public void setPipeAsyncConnectorForcedRetryTotalEventQueueSizeThreshold(
- int pipeAsyncConnectorForcedRetryTotalEventQueueSizeThreshold) {
- if (this.pipeAsyncConnectorForcedRetryTotalEventQueueSizeThreshold
- == pipeAsyncConnectorForcedRetryTotalEventQueueSizeThreshold) {
+ public void setPipeAsyncSinkForcedRetryTotalEventQueueSize(
+ int pipeAsyncSinkForcedRetryTotalEventQueueSize) {
+ if (this.pipeAsyncSinkForcedRetryTotalEventQueueSize
+ == pipeAsyncSinkForcedRetryTotalEventQueueSize) {
return;
}
- this.pipeAsyncConnectorForcedRetryTotalEventQueueSizeThreshold =
- pipeAsyncConnectorForcedRetryTotalEventQueueSizeThreshold;
+ this.pipeAsyncSinkForcedRetryTotalEventQueueSize =
pipeAsyncSinkForcedRetryTotalEventQueueSize;
logger.info(
- "pipeAsyncConnectorForcedRetryTotalEventQueueSizeThreshold is set to
{}.",
- pipeAsyncConnectorForcedRetryTotalEventQueueSizeThreshold);
+ "pipeAsyncSinkForcedRetryTotalEventQueueSize is set to {}.",
+ pipeAsyncSinkForcedRetryTotalEventQueueSize);
}
- public int getPipeAsyncConnectorForcedRetryTotalEventQueueSizeThreshold() {
- return pipeAsyncConnectorForcedRetryTotalEventQueueSizeThreshold;
+ public int getPipeAsyncSinkForcedRetryTotalEventQueueSize() {
+ return pipeAsyncSinkForcedRetryTotalEventQueueSize;
}
public void setPipeAsyncConnectorMaxRetryExecutionTimeMsPerCall(
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
index 2cd745a5e67..1ab26170d19 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
@@ -159,8 +159,8 @@ public class PipeConfig {
/////////////////////////////// Connector ///////////////////////////////
- public int getPipeConnectorHandshakeTimeoutMs() {
- return COMMON_CONFIG.getPipeConnectorHandshakeTimeoutMs();
+ public int getPipeSinkHandshakeTimeoutMs() {
+ return COMMON_CONFIG.getPipeSinkHandshakeTimeoutMs();
}
public int getPipeConnectorTransferTimeoutMs() {
@@ -183,16 +183,16 @@ public class PipeConfig {
return COMMON_CONFIG.isPipeConnectorRPCThriftCompressionEnabled();
}
- public int getPipeAsyncConnectorForcedRetryTsFileEventQueueSizeThreshold() {
- return
COMMON_CONFIG.getPipeAsyncConnectorForcedRetryTsFileEventQueueSizeThreshold();
+ public int getPipeAsyncSinkForcedRetryTsFileEventQueueSize() {
+ return COMMON_CONFIG.getPipeAsyncSinkForcedRetryTsFileEventQueueSize();
}
- public int getPipeAsyncConnectorForcedRetryTabletEventQueueSizeThreshold() {
- return
COMMON_CONFIG.getPipeAsyncConnectorForcedRetryTabletEventQueueSizeThreshold();
+ public int getPipeAsyncSinkForcedRetryTabletEventQueueSize() {
+ return COMMON_CONFIG.getPipeAsyncSinkForcedRetryTabletEventQueueSize();
}
- public int getPipeAsyncConnectorForcedRetryTotalEventQueueSizeThreshold() {
- return
COMMON_CONFIG.getPipeAsyncConnectorForcedRetryTotalEventQueueSizeThreshold();
+ public int getPipeAsyncSinkForcedRetryTotalEventQueueSize() {
+ return COMMON_CONFIG.getPipeAsyncSinkForcedRetryTotalEventQueueSize();
}
public long getPipeAsyncConnectorMaxRetryExecutionTimeMsPerCall() {
@@ -493,7 +493,7 @@ public class PipeConfig {
getPipeSourceAssignerDisruptorRingBufferEntrySizeInBytes());
LOGGER.info("PipeSourceMatcherCacheSize: {}",
getPipeSourceMatcherCacheSize());
- LOGGER.info("PipeConnectorHandshakeTimeoutMs: {}",
getPipeConnectorHandshakeTimeoutMs());
+ LOGGER.info("PipeConnectorHandshakeTimeoutMs: {}",
getPipeSinkHandshakeTimeoutMs());
LOGGER.info("PipeConnectorTransferTimeoutMs: {}",
getPipeConnectorTransferTimeoutMs());
LOGGER.info("PipeConnectorReadFileBufferSize: {}",
getPipeConnectorReadFileBufferSize());
LOGGER.info(
@@ -540,15 +540,14 @@ public class PipeConfig {
getPipeThresholdAllocationStrategyFixedMemoryHighUsageThreshold());
LOGGER.info(
- "PipeAsyncConnectorForcedRetryTsFileEventQueueSizeThreshold: {}",
- getPipeAsyncConnectorForcedRetryTsFileEventQueueSizeThreshold());
+ "PipeAsyncSinkForcedRetryTsFileEventQueueSize: {}",
+ getPipeAsyncSinkForcedRetryTsFileEventQueueSize());
LOGGER.info(
- "PipeAsyncConnectorForcedRetryTabletEventQueueSizeThreshold: {}",
- getPipeAsyncConnectorForcedRetryTabletEventQueueSizeThreshold());
+ "PipeAsyncSinkForcedRetryTabletEventQueueSize: {}",
+ getPipeAsyncSinkForcedRetryTabletEventQueueSize());
LOGGER.info(
- "PipeAsyncConnectorForcedRetryTotalEventQueueSizeThreshold: {}",
- getPipeAsyncConnectorForcedRetryTotalEventQueueSizeThreshold());
-
+ "PipeAsyncSinkForcedRetryTotalEventQueueSize: {}",
+ getPipeAsyncSinkForcedRetryTotalEventQueueSize());
LOGGER.info(
"PipeAsyncConnectorMaxRetryExecutionTimeMsPerCall: {}",
getPipeAsyncConnectorMaxRetryExecutionTimeMsPerCall());
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
index a0b29a5f716..ec6860be3fb 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
@@ -323,13 +323,13 @@ public class PipeDescriptor {
"pipe_extractor_matcher_cache_size",
String.valueOf(config.getPipeSourceMatcherCacheSize())))));
- config.setPipeConnectorHandshakeTimeoutMs(
+ config.setPipeSinkHandshakeTimeoutMs(
Long.parseLong(
Optional.ofNullable(properties.getProperty("pipe_sink_handshake_timeout_ms"))
.orElse(
properties.getProperty(
"pipe_connector_handshake_timeout_ms",
-
String.valueOf(config.getPipeConnectorHandshakeTimeoutMs())))));
+
String.valueOf(config.getPipeSinkHandshakeTimeoutMs())))));
config.setPipeConnectorReadFileBufferSize(
Integer.parseInt(
Optional.ofNullable(properties.getProperty("pipe_sink_read_file_buffer_size"))
@@ -368,7 +368,7 @@ public class PipeDescriptor {
"pipe_async_connector_max_retry_execution_time_ms_per_call",
String.valueOf(
config.getPipeAsyncConnectorMaxRetryExecutionTimeMsPerCall())))));
- config.setPipeAsyncConnectorForcedRetryTsFileEventQueueSizeThreshold(
+ config.setPipeAsyncSinkForcedRetryTsFileEventQueueSize(
Integer.parseInt(
Optional.ofNullable(
properties.getProperty("pipe_async_sink_forced_retry_tsfile_event_queue_size"))
@@ -376,9 +376,8 @@ public class PipeDescriptor {
properties.getProperty(
"pipe_async_connector_forced_retry_tsfile_event_queue_size",
String.valueOf(
- config
-
.getPipeAsyncConnectorForcedRetryTsFileEventQueueSizeThreshold())))));
- config.setPipeAsyncConnectorForcedRetryTabletEventQueueSizeThreshold(
+
config.getPipeAsyncSinkForcedRetryTsFileEventQueueSize())))));
+ config.setPipeAsyncSinkForcedRetryTabletEventQueueSize(
Integer.parseInt(
Optional.ofNullable(
properties.getProperty("pipe_async_sink_forced_retry_tablet_event_queue_size"))
@@ -386,18 +385,15 @@ public class PipeDescriptor {
properties.getProperty(
"pipe_async_connector_forced_retry_tablet_event_queue_size",
String.valueOf(
- config
-
.getPipeAsyncConnectorForcedRetryTabletEventQueueSizeThreshold())))));
- config.setPipeAsyncConnectorForcedRetryTotalEventQueueSizeThreshold(
+
config.getPipeAsyncSinkForcedRetryTabletEventQueueSize())))));
+ config.setPipeAsyncSinkForcedRetryTotalEventQueueSize(
Integer.parseInt(
Optional.ofNullable(
properties.getProperty("pipe_async_sink_forced_retry_total_event_queue_size"))
.orElse(
properties.getProperty(
"pipe_async_connector_forced_retry_total_event_queue_size",
- String.valueOf(
- config
-
.getPipeAsyncConnectorForcedRetryTotalEventQueueSizeThreshold())))));
+
String.valueOf(config.getPipeAsyncSinkForcedRetryTotalEventQueueSize())))));
config.setRateLimiterHotReloadCheckIntervalMs(
Integer.parseInt(
properties.getProperty(
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/client/IoTDBSyncClientManager.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/client/IoTDBSyncClientManager.java
index 7dc92b8cba2..fa5f0d3383c 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/client/IoTDBSyncClientManager.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/client/IoTDBSyncClientManager.java
@@ -193,7 +193,7 @@ public abstract class IoTDBSyncClientManager extends
IoTDBClientManager implemen
clientAndStatus.setLeft(
new IoTDBSyncClient(
new ThriftClientProperty.Builder()
-
.setConnectionTimeoutMs(PIPE_CONFIG.getPipeConnectorHandshakeTimeoutMs())
+
.setConnectionTimeoutMs(PIPE_CONFIG.getPipeSinkHandshakeTimeoutMs())
.setRpcThriftCompressionEnabled(
PIPE_CONFIG.isPipeConnectorRPCThriftCompressionEnabled())
.build(),