This is an automated email from the ASF dual-hosted git repository.

justinchen pushed a commit to branch connection-retry
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/connection-retry by this push:
     new f11db251fef fix-part
f11db251fef is described below

commit f11db251fef11b4db11cd7f883aa34a588ade11f
Author: Caideyipi <[email protected]>
AuthorDate: Thu Feb 5 18:38:02 2026 +0800

    fix-part
---
 .../java/org/apache/iotdb/commons/conf/CommonConfig.java     |  2 +-
 .../pipe/agent/task/subtask/PipeAbstractSinkSubtask.java     |  4 +++-
 .../apache/iotdb/commons/pipe/sink/protocol/IoTDBSink.java   | 12 ++++++------
 3 files changed, 10 insertions(+), 8 deletions(-)

diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
index fdc1bf7007d..b33ee2837e4 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
@@ -270,7 +270,7 @@ public class CommonConfig {
   private int pipeSinkTransferTimeoutMs = 15 * 60 * 1000; // 15 minutes
   private int pipeSinkReadFileBufferSize = 5242880; // 5MB
   private boolean isPipeSinkReadFileBufferMemoryControlEnabled = false;
-  private long pipeSinkRetryIntervalMs = 1000L;
+  private long pipeSinkRetryIntervalMs = 800L;
   private boolean pipeSinkRetryLocallyForConnectionError = true;
   private boolean pipeSinkRPCThriftCompressionEnabled = false;
 
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeAbstractSinkSubtask.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeAbstractSinkSubtask.java
index 70825d3aaec..85cc9e06490 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeAbstractSinkSubtask.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeAbstractSinkSubtask.java
@@ -196,7 +196,9 @@ public abstract class PipeAbstractSinkSubtask extends 
PipeReportableSubtask {
 
     // Stop current pipe task directly if failed to reconnect to
     // the target system after MAX_RETRY_TIMES times
-    if (retry == MAX_RETRY_TIMES && lastEvent instanceof EnrichedEvent) {
+    if (retry == MAX_RETRY_TIMES
+        && lastEvent instanceof EnrichedEvent
+        && 
!PipeConfig.getInstance().isPipeSinkRetryLocallyForConnectionError()) {
       report(
           (EnrichedEvent) lastEvent,
           new PipeRuntimeSinkCriticalException(
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/IoTDBSink.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/IoTDBSink.java
index 9dda99c22d3..88a8b71775f 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/IoTDBSink.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/IoTDBSink.java
@@ -398,7 +398,7 @@ public abstract class IoTDBSink implements PipeConnector {
 
     nodeUrls.clear();
     nodeUrls.addAll(parseNodeUrls(parameters));
-    LOGGER.info("IoTDBConnector nodeUrls: {}", nodeUrls);
+    LOGGER.info("IoTDBSink nodeUrls: {}", nodeUrls);
 
     isTabletBatchModeEnabled =
         parameters.getBooleanOrDefault(
@@ -410,7 +410,7 @@ public abstract class IoTDBSink implements PipeConnector {
                     Arrays.asList(CONNECTOR_FORMAT_KEY, SINK_FORMAT_KEY),
                     CONNECTOR_FORMAT_HYBRID_VALUE)
                 .equals(CONNECTOR_FORMAT_TS_FILE_VALUE);
-    LOGGER.info("IoTDBConnector isTabletBatchModeEnabled: {}", 
isTabletBatchModeEnabled);
+    LOGGER.info("IoTDBSink isTabletBatchModeEnabled: {}", 
isTabletBatchModeEnabled);
 
     final boolean shouldMarkAsGeneralWriteRequest =
         parameters.getBooleanOrDefault(
@@ -426,7 +426,7 @@ public abstract class IoTDBSink implements PipeConnector {
               Arrays.asList(CONNECTOR_MARK_AS_PIPE_REQUEST_KEY, 
SINK_MARK_AS_PIPE_REQUEST_KEY),
               CONNECTOR_MARK_AS_PIPE_REQUEST_DEFAULT_VALUE);
     }
-    LOGGER.info("IoTDBConnector shouldMarkAsPipeRequest: {}", 
shouldMarkAsPipeRequest);
+    LOGGER.info("IoTDBSink shouldMarkAsPipeRequest: {}", 
shouldMarkAsPipeRequest);
 
     final String connectorSkipIfValue =
         parameters
@@ -445,7 +445,7 @@ public abstract class IoTDBSink implements PipeConnector {
       throw new PipeParameterNotValidException(
           String.format("Parameters in set %s are not allowed in 'skipif'", 
skipIfOptionSet));
     }
-    LOGGER.info("IoTDBConnector skipIfNoPrivileges: {}", skipIfNoPrivileges);
+    LOGGER.info("IoTDBSink skipIfNoPrivileges: {}", skipIfNoPrivileges);
 
     receiverStatusHandler =
         new PipeReceiverStatusHandler(
@@ -485,7 +485,7 @@ public abstract class IoTDBSink implements PipeConnector {
                 SINK_EXCEPTION_DATA_CONVERT_ON_TYPE_MISMATCH_KEY),
             CONNECTOR_EXCEPTION_DATA_CONVERT_ON_TYPE_MISMATCH_DEFAULT_VALUE);
     LOGGER.info(
-        "IoTDBConnector {} = {}",
+        "IoTDBSink {} = {}",
         CONNECTOR_EXCEPTION_DATA_CONVERT_ON_TYPE_MISMATCH_KEY,
         shouldReceiverConvertOnTypeMismatch);
     isRealtimeFirst =
@@ -495,7 +495,7 @@ public abstract class IoTDBSink implements PipeConnector {
                 PipeSinkConstant.SINK_REALTIME_FIRST_KEY),
             PipeSinkConstant.CONNECTOR_REALTIME_FIRST_DEFAULT_VALUE);
     LOGGER.info(
-        "IoTDBConnector {} = {}", 
PipeSinkConstant.CONNECTOR_REALTIME_FIRST_KEY, isRealtimeFirst);
+        "IoTDBSink {} = {}", PipeSinkConstant.CONNECTOR_REALTIME_FIRST_KEY, 
isRealtimeFirst);
   }
 
   protected LinkedHashSet<TEndPoint> parseNodeUrls(final PipeParameters 
parameters)

Reply via email to