This is an automated email from the ASF dual-hosted git repository.
justinchen pushed a commit to branch connection-retry
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/connection-retry by this push:
new f11db251fef fix-part
f11db251fef is described below
commit f11db251fef11b4db11cd7f883aa34a588ade11f
Author: Caideyipi <[email protected]>
AuthorDate: Thu Feb 5 18:38:02 2026 +0800
fix-part
---
.../java/org/apache/iotdb/commons/conf/CommonConfig.java | 2 +-
.../pipe/agent/task/subtask/PipeAbstractSinkSubtask.java | 4 +++-
.../apache/iotdb/commons/pipe/sink/protocol/IoTDBSink.java | 12 ++++++------
3 files changed, 10 insertions(+), 8 deletions(-)
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
index fdc1bf7007d..b33ee2837e4 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
@@ -270,7 +270,7 @@ public class CommonConfig {
private int pipeSinkTransferTimeoutMs = 15 * 60 * 1000; // 15 minutes
private int pipeSinkReadFileBufferSize = 5242880; // 5MB
private boolean isPipeSinkReadFileBufferMemoryControlEnabled = false;
- private long pipeSinkRetryIntervalMs = 1000L;
+ private long pipeSinkRetryIntervalMs = 800L;
private boolean pipeSinkRetryLocallyForConnectionError = true;
private boolean pipeSinkRPCThriftCompressionEnabled = false;
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeAbstractSinkSubtask.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeAbstractSinkSubtask.java
index 70825d3aaec..85cc9e06490 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeAbstractSinkSubtask.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeAbstractSinkSubtask.java
@@ -196,7 +196,9 @@ public abstract class PipeAbstractSinkSubtask extends
PipeReportableSubtask {
// Stop current pipe task directly if failed to reconnect to
// the target system after MAX_RETRY_TIMES times
- if (retry == MAX_RETRY_TIMES && lastEvent instanceof EnrichedEvent) {
+ if (retry == MAX_RETRY_TIMES
+ && lastEvent instanceof EnrichedEvent
+ &&
!PipeConfig.getInstance().isPipeSinkRetryLocallyForConnectionError()) {
report(
(EnrichedEvent) lastEvent,
new PipeRuntimeSinkCriticalException(
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/IoTDBSink.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/IoTDBSink.java
index 9dda99c22d3..88a8b71775f 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/IoTDBSink.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/IoTDBSink.java
@@ -398,7 +398,7 @@ public abstract class IoTDBSink implements PipeConnector {
nodeUrls.clear();
nodeUrls.addAll(parseNodeUrls(parameters));
- LOGGER.info("IoTDBConnector nodeUrls: {}", nodeUrls);
+ LOGGER.info("IoTDBSink nodeUrls: {}", nodeUrls);
isTabletBatchModeEnabled =
parameters.getBooleanOrDefault(
@@ -410,7 +410,7 @@ public abstract class IoTDBSink implements PipeConnector {
Arrays.asList(CONNECTOR_FORMAT_KEY, SINK_FORMAT_KEY),
CONNECTOR_FORMAT_HYBRID_VALUE)
.equals(CONNECTOR_FORMAT_TS_FILE_VALUE);
- LOGGER.info("IoTDBConnector isTabletBatchModeEnabled: {}",
isTabletBatchModeEnabled);
+ LOGGER.info("IoTDBSink isTabletBatchModeEnabled: {}",
isTabletBatchModeEnabled);
final boolean shouldMarkAsGeneralWriteRequest =
parameters.getBooleanOrDefault(
@@ -426,7 +426,7 @@ public abstract class IoTDBSink implements PipeConnector {
Arrays.asList(CONNECTOR_MARK_AS_PIPE_REQUEST_KEY,
SINK_MARK_AS_PIPE_REQUEST_KEY),
CONNECTOR_MARK_AS_PIPE_REQUEST_DEFAULT_VALUE);
}
- LOGGER.info("IoTDBConnector shouldMarkAsPipeRequest: {}",
shouldMarkAsPipeRequest);
+ LOGGER.info("IoTDBSink shouldMarkAsPipeRequest: {}",
shouldMarkAsPipeRequest);
final String connectorSkipIfValue =
parameters
@@ -445,7 +445,7 @@ public abstract class IoTDBSink implements PipeConnector {
throw new PipeParameterNotValidException(
String.format("Parameters in set %s are not allowed in 'skipif'",
skipIfOptionSet));
}
- LOGGER.info("IoTDBConnector skipIfNoPrivileges: {}", skipIfNoPrivileges);
+ LOGGER.info("IoTDBSink skipIfNoPrivileges: {}", skipIfNoPrivileges);
receiverStatusHandler =
new PipeReceiverStatusHandler(
@@ -485,7 +485,7 @@ public abstract class IoTDBSink implements PipeConnector {
SINK_EXCEPTION_DATA_CONVERT_ON_TYPE_MISMATCH_KEY),
CONNECTOR_EXCEPTION_DATA_CONVERT_ON_TYPE_MISMATCH_DEFAULT_VALUE);
LOGGER.info(
- "IoTDBConnector {} = {}",
+ "IoTDBSink {} = {}",
CONNECTOR_EXCEPTION_DATA_CONVERT_ON_TYPE_MISMATCH_KEY,
shouldReceiverConvertOnTypeMismatch);
isRealtimeFirst =
@@ -495,7 +495,7 @@ public abstract class IoTDBSink implements PipeConnector {
PipeSinkConstant.SINK_REALTIME_FIRST_KEY),
PipeSinkConstant.CONNECTOR_REALTIME_FIRST_DEFAULT_VALUE);
LOGGER.info(
- "IoTDBConnector {} = {}",
PipeSinkConstant.CONNECTOR_REALTIME_FIRST_KEY, isRealtimeFirst);
+ "IoTDBSink {} = {}", PipeSinkConstant.CONNECTOR_REALTIME_FIRST_KEY,
isRealtimeFirst);
}
protected LinkedHashSet<TEndPoint> parseNodeUrls(final PipeParameters
parameters)