This is an automated email from the ASF dual-hosted git repository.

justinchen pushed a commit to branch connection-local-13
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit b4b4e811f38cbf546999f963b0fde5860f4e2b93
Author: Caideyipi <[email protected]>
AuthorDate: Mon Feb 9 14:28:17 2026 +0800

    Pipe: Enabled locally retry for PipeConnectionException (#17182)
    
    * fix
    
    * may-fix
    
    * fix
    
    * fix
    
    * try-complete
    
    * fix-part
    
    * Update IoTDBDataRegionAsyncSink.java
    
    * fix
    
    * fix
---
 .../iotdb/tool/tsfile/ImportTsFileRemotely.java    |   6 +-
 .../exchange/sender/TwoStageAggregateSender.java   |   3 +-
 .../protocol/airgap/IoTDBAirGapReceiver.java       |   2 +-
 .../sink/protocol/legacy/IoTDBLegacyPipeSink.java  |   2 +-
 .../pipeconsensus/PipeConsensusSyncSink.java       |   2 +-
 .../PipeConsensusTsFileInsertionEventHandler.java  |   2 +-
 .../thrift/async/IoTDBDataRegionAsyncSink.java     |  39 +++++---
 .../PipeTransferTabletBatchEventHandler.java       |   2 +-
 .../PipeTransferTabletInsertionEventHandler.java   |   2 +-
 .../async/handler/PipeTransferTsFileHandler.java   |   8 +-
 .../iotdb/commons/client/ClientPoolFactory.java    |  20 ++--
 .../apache/iotdb/commons/conf/CommonConfig.java    | 111 ++++++++++++---------
 .../task/subtask/PipeAbstractSinkSubtask.java      |  20 ++--
 .../agent/task/subtask/PipeReportableSubtask.java  |  23 +++--
 .../iotdb/commons/pipe/config/PipeConfig.java      |  85 ++++++++--------
 .../iotdb/commons/pipe/config/PipeDescriptor.java  |  32 +++---
 .../pipe/sink/client/IoTDBClientManager.java       |   2 +-
 .../commons/pipe/sink/client/IoTDBSyncClient.java  |   2 +-
 .../pipe/sink/client/IoTDBSyncClientManager.java   |   2 +-
 .../pipe/sink/limiter/GlobalRPCRateLimiter.java    |   2 +-
 .../pipe/sink/protocol/IoTDBAirGapSink.java        |   4 +-
 .../commons/pipe/sink/protocol/IoTDBSink.java      |  12 +--
 .../pipe/sink/protocol/IoTDBSslSyncSink.java       |   2 +-
 23 files changed, 210 insertions(+), 175 deletions(-)

diff --git 
a/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/tsfile/ImportTsFileRemotely.java
 
b/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/tsfile/ImportTsFileRemotely.java
index 855c1678a84..fefff821fa1 100644
--- 
a/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/tsfile/ImportTsFileRemotely.java
+++ 
b/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/tsfile/ImportTsFileRemotely.java
@@ -162,7 +162,7 @@ public class ImportTsFileRemotely extends ImportTsFileBase {
                 "Handshake error with target server ip: %s, port: %s, because: 
%s.",
                 client.getIpAddress(), client.getPort(), resp.getStatus()));
       } else {
-        
client.setTimeout(PipeConfig.getInstance().getPipeConnectorTransferTimeoutMs());
+        
client.setTimeout(PipeConfig.getInstance().getPipeSinkTransferTimeoutMs());
         IOT_PRINTER.println(
             String.format(
                 "Handshake success. Target server ip: %s, port: %s",
@@ -228,7 +228,7 @@ public class ImportTsFileRemotely extends ImportTsFileBase {
 
   private void transferFilePieces(final File file, final boolean isMultiFile)
       throws PipeException, IOException {
-    final int readFileBufferSize = 
PipeConfig.getInstance().getPipeConnectorReadFileBufferSize();
+    final int readFileBufferSize = 
PipeConfig.getInstance().getPipeSinkReadFileBufferSize();
     final byte[] readBuffer = new byte[readFileBufferSize];
     long position = 0;
     try (final RandomAccessFile reader = new RandomAccessFile(file, "r")) {
@@ -297,7 +297,7 @@ public class ImportTsFileRemotely extends ImportTsFileBase {
               new ThriftClientProperty.Builder()
                   
.setConnectionTimeoutMs(PipeConfig.getInstance().getPipeSinkHandshakeTimeoutMs())
                   .setRpcThriftCompressionEnabled(
-                      
PipeConfig.getInstance().isPipeConnectorRPCThriftCompressionEnabled())
+                      
PipeConfig.getInstance().isPipeSinkRPCThriftCompressionEnabled())
                   .build(),
               getEndPoint().getIp(),
               getEndPoint().getPort(),
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/exchange/sender/TwoStageAggregateSender.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/exchange/sender/TwoStageAggregateSender.java
index 45d6e45d25c..3c36559a300 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/exchange/sender/TwoStageAggregateSender.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/exchange/sender/TwoStageAggregateSender.java
@@ -209,8 +209,7 @@ public class TwoStageAggregateSender implements 
AutoCloseable {
     return new IoTDBSyncClient(
         new ThriftClientProperty.Builder()
             
.setConnectionTimeoutMs(PIPE_CONFIG.getPipeSinkHandshakeTimeoutMs())
-            .setRpcThriftCompressionEnabled(
-                PIPE_CONFIG.isPipeConnectorRPCThriftCompressionEnabled())
+            
.setRpcThriftCompressionEnabled(PIPE_CONFIG.isPipeSinkRPCThriftCompressionEnabled())
             .build(),
         endPoint.getIp(),
         endPoint.getPort(),
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/airgap/IoTDBAirGapReceiver.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/airgap/IoTDBAirGapReceiver.java
index 3cea6c998f8..610b9e5fe1a 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/airgap/IoTDBAirGapReceiver.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/airgap/IoTDBAirGapReceiver.java
@@ -70,7 +70,7 @@ public class IoTDBAirGapReceiver extends WrappedRunnable {
 
   @Override
   public void runMayThrow() throws Throwable {
-    
socket.setSoTimeout(PipeConfig.getInstance().getPipeConnectorTransferTimeoutMs());
+    
socket.setSoTimeout(PipeConfig.getInstance().getPipeSinkTransferTimeoutMs());
     socket.setKeepAlive(true);
 
     LOGGER.info("Pipe air gap receiver {} started. Socket: {}", receiverId, 
socket);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/legacy/IoTDBLegacyPipeSink.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/legacy/IoTDBLegacyPipeSink.java
index 406c5cb4fdc..4328c758d39 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/legacy/IoTDBLegacyPipeSink.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/legacy/IoTDBLegacyPipeSink.java
@@ -385,7 +385,7 @@ public class IoTDBLegacyPipeSink implements PipeConnector {
     long position = 0;
 
     // Try small piece to rebase the file position.
-    final byte[] buffer = new 
byte[PipeConfig.getInstance().getPipeConnectorReadFileBufferSize()];
+    final byte[] buffer = new 
byte[PipeConfig.getInstance().getPipeSinkReadFileBufferSize()];
     try (final RandomAccessFile randomAccessFile = new RandomAccessFile(file, 
"r")) {
       while (true) {
         final int dataLength = randomAccessFile.read(buffer);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/pipeconsensus/PipeConsensusSyncSink.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/pipeconsensus/PipeConsensusSyncSink.java
index 7527452c134..b059e484734 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/pipeconsensus/PipeConsensusSyncSink.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/pipeconsensus/PipeConsensusSyncSink.java
@@ -373,7 +373,7 @@ public class PipeConsensusSyncSink extends IoTDBSink {
       final TCommitId tCommitId,
       final TConsensusGroupId tConsensusGroupId)
       throws PipeException, IOException {
-    final int readFileBufferSize = 
PipeConfig.getInstance().getPipeConnectorReadFileBufferSize();
+    final int readFileBufferSize = 
PipeConfig.getInstance().getPipeSinkReadFileBufferSize();
     final byte[] readBuffer = new byte[readFileBufferSize];
     long position = 0;
     try (final RandomAccessFile reader = new RandomAccessFile(file, "r")) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/pipeconsensus/handler/PipeConsensusTsFileInsertionEventHandler.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/pipeconsensus/handler/PipeConsensusTsFileInsertionEventHandler.java
index 4fccd62bffa..22b55239e19 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/pipeconsensus/handler/PipeConsensusTsFileInsertionEventHandler.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/pipeconsensus/handler/PipeConsensusTsFileInsertionEventHandler.java
@@ -100,7 +100,7 @@ public class PipeConsensusTsFileInsertionEventHandler
     transferMod = event.isWithMod();
     currentFile = transferMod ? modFile : tsFile;
 
-    readFileBufferSize = 
PipeConfig.getInstance().getPipeConnectorReadFileBufferSize();
+    readFileBufferSize = 
PipeConfig.getInstance().getPipeSinkReadFileBufferSize();
     readBuffer = new byte[readFileBufferSize];
     position = 0;
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java
index e8eca26293c..aab891c6c7a 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java
@@ -20,6 +20,9 @@
 package org.apache.iotdb.db.pipe.sink.protocol.thrift.async;
 
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+
+import org.apache.iotdb.commons.audit.UserEntity;
+import org.apache.iotdb.commons.client.ThriftClient;
 import 
org.apache.iotdb.commons.client.async.AsyncPipeDataTransferServiceClient;
 import org.apache.iotdb.commons.pipe.config.PipeConfig;
 import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
@@ -56,6 +59,7 @@ import 
org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
 import org.apache.iotdb.pipe.api.event.Event;
 import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
 import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
+import org.apache.iotdb.pipe.api.exception.PipeConnectionException;
 import org.apache.iotdb.pipe.api.exception.PipeException;
 import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
 
@@ -121,6 +125,7 @@ public class IoTDBDataRegionAsyncSink extends IoTDBSink {
       new ConcurrentHashMap<>();
 
   private boolean enableSendTsFileLimit;
+  private volatile boolean isConnectionException;
 
   @Override
   public void validate(final PipeParameterValidator validator) throws 
Exception {
@@ -251,10 +256,10 @@ public class IoTDBDataRegionAsyncSink extends IoTDBSink {
                   null,
                   false));
         }
-      } catch (final Throwable t) {
-        LOGGER.warn("Failed to transfer tsfile batch ({}).", sealedFiles, t);
+      } catch (final Exception e) {
+        LOGGER.warn("Failed to transfer tsfile batch ({}).", sealedFiles, e);
         if (eventsHadBeenAddedToRetryQueue.compareAndSet(false, true)) {
-          addFailureEventsToRetryQueue(events);
+          addFailureEventsToRetryQueue(events, e);
         }
       }
     } else {
@@ -579,7 +584,7 @@ public class IoTDBDataRegionAsyncSink extends IoTDBSink {
 
       // Stop retrying if the execution time exceeds the threshold for better 
realtime performance
       if (System.currentTimeMillis() - retryStartTime
-          > 
PipeConfig.getInstance().getPipeAsyncConnectorMaxRetryExecutionTimeMsPerCall()) 
{
+          > 
PipeConfig.getInstance().getPipeAsyncSinkMaxRetryExecutionTimeMsPerCall()) {
         if (retryEventQueueEventCounter.getTabletInsertionEventCount()
                 < 
PipeConfig.getInstance().getPipeAsyncSinkForcedRetryTabletEventQueueSize()
             && retryEventQueueEventCounter.getTsFileInsertionEventCount()
@@ -590,14 +595,17 @@ public class IoTDBDataRegionAsyncSink extends IoTDBSink {
         }
 
         if (remainingEvents <= retryEventQueue.size() + 
retryTsFileQueue.size()) {
-          throw new PipeException(
+          final String message =
               "Failed to retry transferring events in the retry queue. 
Remaining events: "
                   + (retryEventQueue.size() + retryTsFileQueue.size())
                   + " (tablet events: "
                   + retryEventQueueEventCounter.getTabletInsertionEventCount()
                   + ", tsfile events: "
                   + retryEventQueueEventCounter.getTsFileInsertionEventCount()
-                  + ").");
+                  + ").";
+          throw isConnectionException
+              ? new PipeConnectionException(message)
+              : new PipeException(message);
         }
       }
     }
@@ -613,7 +621,7 @@ public class IoTDBDataRegionAsyncSink extends IoTDBSink {
               
.decreaseReferenceCount(IoTDBDataRegionAsyncSink.class.getName(), false);
         }
       } catch (final Exception e) {
-        addFailureEventToRetryQueue(tabletInsertionEvent);
+        addFailureEventToRetryQueue(tabletInsertionEvent, e);
       }
       return;
     }
@@ -626,14 +634,14 @@ public class IoTDBDataRegionAsyncSink extends IoTDBSink {
               
.decreaseReferenceCount(IoTDBDataRegionAsyncSink.class.getName(), false);
         }
       } else {
-        addFailureEventToRetryQueue(tabletInsertionEvent);
+        addFailureEventToRetryQueue(tabletInsertionEvent, null);
       }
     } catch (final Exception e) {
       if (tabletInsertionEvent instanceof EnrichedEvent) {
         ((EnrichedEvent) tabletInsertionEvent)
             .decreaseReferenceCount(IoTDBDataRegionAsyncSink.class.getName(), 
false);
       }
-      addFailureEventToRetryQueue(tabletInsertionEvent);
+      addFailureEventToRetryQueue(tabletInsertionEvent, e);
     }
   }
 
@@ -643,11 +651,11 @@ public class IoTDBDataRegionAsyncSink extends IoTDBSink {
         tsFileInsertionEvent.decreaseReferenceCount(
             IoTDBDataRegionAsyncSink.class.getName(), false);
       } else {
-        addFailureEventToRetryQueue(tsFileInsertionEvent);
+        addFailureEventToRetryQueue(tsFileInsertionEvent, null);
       }
     } catch (final Exception e) {
       
tsFileInsertionEvent.decreaseReferenceCount(IoTDBDataRegionAsyncSink.class.getName(),
 false);
-      addFailureEventToRetryQueue(tsFileInsertionEvent);
+      addFailureEventToRetryQueue(tsFileInsertionEvent, e);
     }
   }
 
@@ -657,7 +665,9 @@ public class IoTDBDataRegionAsyncSink extends IoTDBSink {
    * @param event {@link Event} to retry
    */
   @SuppressWarnings("java:S899")
-  public void addFailureEventToRetryQueue(final Event event) {
+  public void addFailureEventToRetryQueue(final Event event, final Exception 
e) {
+    isConnectionException =
+        e instanceof PipeConnectionException || 
ThriftClient.isConnectionBroken(e);
     if (event instanceof EnrichedEvent && ((EnrichedEvent) 
event).isReleased()) {
       return;
     }
@@ -693,8 +703,9 @@ public class IoTDBDataRegionAsyncSink extends IoTDBSink {
    *
    * @param events {@link EnrichedEvent}s to retry
    */
-  public void addFailureEventsToRetryQueue(final Iterable<EnrichedEvent> 
events) {
-    events.forEach(this::addFailureEventToRetryQueue);
+  public void addFailureEventsToRetryQueue(
+      final Iterable<EnrichedEvent> events, final Exception e) {
+    events.forEach(event -> addFailureEventToRetryQueue(event, e));
   }
 
   public boolean isEnableSendTsFileLimit() {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTabletBatchEventHandler.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTabletBatchEventHandler.java
index 539764cfc64..110d3cb6450 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTabletBatchEventHandler.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTabletBatchEventHandler.java
@@ -123,7 +123,7 @@ public class PipeTransferTabletBatchEventHandler extends 
PipeTransferTrackableHa
           events.size(),
           
events.stream().map(EnrichedEvent::getPipeName).collect(Collectors.toSet()));
     } finally {
-      connector.addFailureEventsToRetryQueue(events);
+      connector.addFailureEventsToRetryQueue(events, exception);
     }
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTabletInsertionEventHandler.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTabletInsertionEventHandler.java
index af79c87c253..66a1f4a013b 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTabletInsertionEventHandler.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTabletInsertionEventHandler.java
@@ -109,7 +109,7 @@ public abstract class 
PipeTransferTabletInsertionEventHandler extends PipeTransf
           event instanceof EnrichedEvent ? ((EnrichedEvent) 
event).getCommitterKey() : null,
           event instanceof EnrichedEvent ? ((EnrichedEvent) 
event).getCommitIds() : null);
     } finally {
-      connector.addFailureEventToRetryQueue(event);
+      connector.addFailureEventToRetryQueue(event, exception);
     }
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTsFileHandler.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTsFileHandler.java
index c5ced29b735..1152710ec52 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTsFileHandler.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTsFileHandler.java
@@ -123,7 +123,7 @@ public class PipeTransferTsFileHandler extends 
PipeTransferTrackableHandler {
     readFileBufferSize =
         (int)
             Math.min(
-                PipeConfig.getInstance().getPipeConnectorReadFileBufferSize(),
+                PipeConfig.getInstance().getPipeSinkReadFileBufferSize(),
                 transferMod ? Math.max(tsFile.length(), modFile.length()) : 
tsFile.length());
     position = 0;
 
@@ -143,7 +143,7 @@ public class PipeTransferTsFileHandler extends 
PipeTransferTrackableHandler {
       memoryBlock =
           PipeDataNodeResourceManager.memory()
               .forceAllocateForTsFileWithRetry(
-                  
PipeConfig.getInstance().isPipeConnectorReadFileBufferMemoryControlEnabled()
+                  
PipeConfig.getInstance().isPipeSinkReadFileBufferMemoryControlEnabled()
                       ? readFileBufferSize
                       : 0);
       readBuffer = new byte[readFileBufferSize];
@@ -407,7 +407,7 @@ public class PipeTransferTsFileHandler extends 
PipeTransferTrackableHandler {
         returnClientIfNecessary();
       } finally {
         if (eventsHadBeenAddedToRetryQueue.compareAndSet(false, true)) {
-          connector.addFailureEventsToRetryQueue(events);
+          connector.addFailureEventsToRetryQueue(events, exception);
         }
       }
     }
@@ -469,7 +469,7 @@ public class PipeTransferTsFileHandler extends 
PipeTransferTrackableHandler {
    * @param timeoutMs CAN NOT BE UNLIMITED, otherwise it may cause deadlock.
    */
   private void waitForResourceEnough4Slicing(final long timeoutMs) throws 
InterruptedException {
-    if 
(!PipeConfig.getInstance().isPipeConnectorReadFileBufferMemoryControlEnabled()) 
{
+    if 
(!PipeConfig.getInstance().isPipeSinkReadFileBufferMemoryControlEnabled()) {
       return;
     }
 
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientPoolFactory.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientPoolFactory.java
index 18f76a91889..3ff47a2c5e1 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientPoolFactory.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientPoolFactory.java
@@ -280,15 +280,13 @@ public class ClientPoolFactory {
               new AsyncPipeDataTransferServiceClient.Factory(
                   manager,
                   new ThriftClientProperty.Builder()
-                      
.setConnectionTimeoutMs(conf.getPipeConnectorTransferTimeoutMs())
-                      .setRpcThriftCompressionEnabled(
-                          conf.isPipeConnectorRPCThriftCompressionEnabled())
-                      .setSelectorNumOfAsyncClientManager(
-                          conf.getPipeAsyncConnectorSelectorNumber())
+                      
.setConnectionTimeoutMs(conf.getPipeSinkTransferTimeoutMs())
+                      
.setRpcThriftCompressionEnabled(conf.isPipeSinkRPCThriftCompressionEnabled())
+                      
.setSelectorNumOfAsyncClientManager(conf.getPipeAsyncSinkSelectorNumber())
                       .build(),
                   ThreadName.PIPE_ASYNC_CONNECTOR_CLIENT_POOL.getName()),
               new 
ClientPoolProperty.Builder<AsyncPipeDataTransferServiceClient>()
-                  
.setMaxClientNumForEachNode(conf.getPipeAsyncConnectorMaxClientNumber())
+                  
.setMaxClientNumForEachNode(conf.getPipeAsyncSinkMaxClientNumber())
                   .build()
                   .getConfig());
       ClientManagerMetrics.getInstance()
@@ -307,16 +305,14 @@ public class ClientPoolFactory {
               new AsyncPipeDataTransferServiceClient.Factory(
                   manager,
                   new ThriftClientProperty.Builder()
-                      
.setConnectionTimeoutMs(conf.getPipeConnectorTransferTimeoutMs())
-                      .setRpcThriftCompressionEnabled(
-                          conf.isPipeConnectorRPCThriftCompressionEnabled())
-                      .setSelectorNumOfAsyncClientManager(
-                          conf.getPipeAsyncConnectorSelectorNumber())
+                      
.setConnectionTimeoutMs(conf.getPipeSinkTransferTimeoutMs())
+                      
.setRpcThriftCompressionEnabled(conf.isPipeSinkRPCThriftCompressionEnabled())
+                      
.setSelectorNumOfAsyncClientManager(conf.getPipeAsyncSinkSelectorNumber())
                       
.setPrintLogWhenEncounterException(conf.isPrintLogWhenEncounterException())
                       .build(),
                   ThreadName.PIPE_ASYNC_CONNECTOR_CLIENT_POOL.getName()),
               new 
ClientPoolProperty.Builder<AsyncPipeDataTransferServiceClient>()
-                  
.setMaxClientNumForEachNode(conf.getPipeAsyncConnectorMaxTsFileClientNumber())
+                  
.setMaxClientNumForEachNode(conf.getPipeAsyncSinkMaxTsFileClientNumber())
                   .build()
                   .getConfig());
       ClientManagerMetrics.getInstance()
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
index 9e4212c0b2b..09b6beb8c13 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
@@ -263,11 +263,12 @@ public class CommonConfig {
   private long pipeSourceMatcherCacheSize = 1024;
 
   private int pipeSinkHandshakeTimeoutMs = 10 * 1000; // 10 seconds
-  private int pipeConnectorTransferTimeoutMs = 15 * 60 * 1000; // 15 minutes
-  private int pipeConnectorReadFileBufferSize = 5242880; // 5MB
-  private boolean isPipeConnectorReadFileBufferMemoryControlEnabled = false;
-  private long pipeConnectorRetryIntervalMs = 1000L;
-  private boolean pipeConnectorRPCThriftCompressionEnabled = false;
+  private int pipeSinkTransferTimeoutMs = 15 * 60 * 1000; // 15 minutes
+  private int pipeSinkReadFileBufferSize = 5242880; // 5MB
+  private boolean isPipeSinkReadFileBufferMemoryControlEnabled = false;
+  private long pipeSinkRetryIntervalMs = 800L;
+  private boolean pipeSinkRetryLocallyForConnectionError = true;
+  private boolean pipeSinkRPCThriftCompressionEnabled = false;
 
   private int pipeAsyncSinkForcedRetryTsFileEventQueueSize = 5;
   private int pipeAsyncSinkForcedRetryTabletEventQueueSize = 20;
@@ -1012,68 +1013,65 @@ public class CommonConfig {
     }
   }
 
-  public int getPipeConnectorTransferTimeoutMs() {
-    return pipeConnectorTransferTimeoutMs;
+  public int getPipeSinkTransferTimeoutMs() {
+    return pipeSinkTransferTimeoutMs;
   }
 
-  public void setPipeConnectorTransferTimeoutMs(long 
pipeConnectorTransferTimeoutMs) {
-    final int fPipeConnectorTransferTimeoutMs = 
this.pipeConnectorTransferTimeoutMs;
+  public void setPipeSinkTransferTimeoutMs(long pipeSinkTransferTimeoutMs) {
+    final int fPipeConnectorTransferTimeoutMs = this.pipeSinkTransferTimeoutMs;
     try {
-      this.pipeConnectorTransferTimeoutMs = 
Math.toIntExact(pipeConnectorTransferTimeoutMs);
+      this.pipeSinkTransferTimeoutMs = 
Math.toIntExact(pipeSinkTransferTimeoutMs);
     } catch (ArithmeticException e) {
-      this.pipeConnectorTransferTimeoutMs = Integer.MAX_VALUE;
+      this.pipeSinkTransferTimeoutMs = Integer.MAX_VALUE;
       logger.warn(
           "Given pipe connector transfer timeout is too large, set to {} ms.", 
Integer.MAX_VALUE);
     } finally {
-      if (fPipeConnectorTransferTimeoutMs != 
this.pipeConnectorTransferTimeoutMs) {
-        logger.info("pipeConnectorTransferTimeoutMs is set to {}.", 
pipeConnectorTransferTimeoutMs);
+      if (fPipeConnectorTransferTimeoutMs != this.pipeSinkTransferTimeoutMs) {
+        logger.info("pipeConnectorTransferTimeoutMs is set to {}.", 
pipeSinkTransferTimeoutMs);
       }
     }
   }
 
-  public int getPipeConnectorReadFileBufferSize() {
-    return pipeConnectorReadFileBufferSize;
+  public int getPipeSinkReadFileBufferSize() {
+    return pipeSinkReadFileBufferSize;
   }
 
-  public void setPipeConnectorReadFileBufferSize(int 
pipeConnectorReadFileBufferSize) {
-    if (this.pipeConnectorReadFileBufferSize == 
pipeConnectorReadFileBufferSize) {
+  public void setPipeSinkReadFileBufferSize(int pipeSinkReadFileBufferSize) {
+    if (this.pipeSinkReadFileBufferSize == pipeSinkReadFileBufferSize) {
       return;
     }
-    this.pipeConnectorReadFileBufferSize = pipeConnectorReadFileBufferSize;
-    logger.info("pipeConnectorReadFileBufferSize is set to {}.", 
pipeConnectorReadFileBufferSize);
+    this.pipeSinkReadFileBufferSize = pipeSinkReadFileBufferSize;
+    logger.info("pipeConnectorReadFileBufferSize is set to {}.", 
pipeSinkReadFileBufferSize);
   }
 
-  public boolean isPipeConnectorReadFileBufferMemoryControlEnabled() {
-    return isPipeConnectorReadFileBufferMemoryControlEnabled;
+  public boolean isPipeSinkReadFileBufferMemoryControlEnabled() {
+    return isPipeSinkReadFileBufferMemoryControlEnabled;
   }
 
-  public void setIsPipeConnectorReadFileBufferMemoryControlEnabled(
-      boolean isPipeConnectorReadFileBufferMemoryControlEnabled) {
-    if (this.isPipeConnectorReadFileBufferMemoryControlEnabled
-        == isPipeConnectorReadFileBufferMemoryControlEnabled) {
+  public void setIsPipeSinkReadFileBufferMemoryControlEnabled(
+      boolean isPipeSinkReadFileBufferMemoryControlEnabled) {
+    if (this.isPipeSinkReadFileBufferMemoryControlEnabled
+        == isPipeSinkReadFileBufferMemoryControlEnabled) {
       return;
     }
-    this.isPipeConnectorReadFileBufferMemoryControlEnabled =
-        isPipeConnectorReadFileBufferMemoryControlEnabled;
+    this.isPipeSinkReadFileBufferMemoryControlEnabled =
+        isPipeSinkReadFileBufferMemoryControlEnabled;
     logger.info(
-        "isPipeConnectorReadFileBufferMemoryControlEnabled is set to {}.",
-        isPipeConnectorReadFileBufferMemoryControlEnabled);
+        "isPipeSinkReadFileBufferMemoryControlEnabled is set to {}.",
+        isPipeSinkReadFileBufferMemoryControlEnabled);
   }
 
-  public void setPipeConnectorRPCThriftCompressionEnabled(
-      boolean pipeConnectorRPCThriftCompressionEnabled) {
-    if (this.isPipeConnectorReadFileBufferMemoryControlEnabled
-        == pipeConnectorRPCThriftCompressionEnabled) {
+  public void setPipeSinkRPCThriftCompressionEnabled(boolean 
pipeSinkRPCThriftCompressionEnabled) {
+    if (this.isPipeSinkReadFileBufferMemoryControlEnabled == 
pipeSinkRPCThriftCompressionEnabled) {
       return;
     }
-    this.pipeConnectorRPCThriftCompressionEnabled = 
pipeConnectorRPCThriftCompressionEnabled;
+    this.pipeSinkRPCThriftCompressionEnabled = 
pipeSinkRPCThriftCompressionEnabled;
     logger.info(
-        "pipeConnectorRPCThriftCompressionEnabled is set to {}.",
-        pipeConnectorRPCThriftCompressionEnabled);
+        "pipeSinkRPCThriftCompressionEnabled is set to {}.", 
pipeSinkRPCThriftCompressionEnabled);
   }
 
-  public boolean isPipeConnectorRPCThriftCompressionEnabled() {
-    return pipeConnectorRPCThriftCompressionEnabled;
+  public boolean isPipeSinkRPCThriftCompressionEnabled() {
+    return pipeSinkRPCThriftCompressionEnabled;
   }
 
   public void setPipeAsyncSinkForcedRetryTsFileEventQueueSize(
@@ -1139,11 +1137,11 @@ public class CommonConfig {
         pipeAsyncConnectorMaxRetryExecutionTimeMsPerCall);
   }
 
-  public long getPipeAsyncConnectorMaxRetryExecutionTimeMsPerCall() {
+  public long getPipeAsyncSinkMaxRetryExecutionTimeMsPerCall() {
     return pipeAsyncConnectorMaxRetryExecutionTimeMsPerCall;
   }
 
-  public int getPipeAsyncConnectorSelectorNumber() {
+  public int getPipeAsyncSinkSelectorNumber() {
     return pipeAsyncConnectorSelectorNumber;
   }
 
@@ -1161,7 +1159,7 @@ public class CommonConfig {
     logger.info("pipeAsyncConnectorSelectorNumber is set to {}.", 
pipeAsyncConnectorSelectorNumber);
   }
 
-  public int getPipeAsyncConnectorMaxClientNumber() {
+  public int getPipeAsyncSinkMaxClientNumber() {
     return pipeAsyncConnectorMaxClientNumber;
   }
 
@@ -1180,7 +1178,7 @@ public class CommonConfig {
         "pipeAsyncConnectorMaxClientNumber is set to {}.", 
pipeAsyncConnectorMaxClientNumber);
   }
 
-  public int getPipeAsyncConnectorMaxTsFileClientNumber() {
+  public int getPipeAsyncSinkMaxTsFileClientNumber() {
     return pipeAsyncConnectorMaxTsFileClientNumber;
   }
 
@@ -1297,16 +1295,31 @@ public class CommonConfig {
     logger.info("pipeAutoRestartEnabled is set to {}.", 
pipeAutoRestartEnabled);
   }
 
-  public long getPipeConnectorRetryIntervalMs() {
-    return pipeConnectorRetryIntervalMs;
+  public long getPipeSinkRetryIntervalMs() {
+    return pipeSinkRetryIntervalMs;
   }
 
-  public void setPipeConnectorRetryIntervalMs(long 
pipeConnectorRetryIntervalMs) {
-    if (this.pipeConnectorRetryIntervalMs == pipeConnectorRetryIntervalMs) {
+  public void setPipeSinkRetryIntervalMs(long pipeConnectorRetryIntervalMs) {
+    if (this.pipeSinkRetryIntervalMs == pipeConnectorRetryIntervalMs) {
       return;
     }
-    this.pipeConnectorRetryIntervalMs = pipeConnectorRetryIntervalMs;
-    logger.info("pipeConnectorRetryIntervalMs is set to {}", 
pipeConnectorRetryIntervalMs);
+    this.pipeSinkRetryIntervalMs = pipeConnectorRetryIntervalMs;
+    logger.info("pipeSinkRetryIntervalMs is set to {}", 
pipeConnectorRetryIntervalMs);
+  }
+
+  public boolean isPipeSinkRetryLocallyForConnectionError() {
+    return pipeSinkRetryLocallyForConnectionError;
+  }
+
+  public void setPipeSinkRetryLocallyForConnectionError(
+      boolean pipeSinkRetryLocallyForConnectionError) {
+    if (this.pipeSinkRetryLocallyForConnectionError == 
pipeSinkRetryLocallyForConnectionError) {
+      return;
+    }
+    this.pipeSinkRetryLocallyForConnectionError = 
pipeSinkRetryLocallyForConnectionError;
+    logger.info(
+        "pipeSinkRetryLocallyForConnectionError is set to {}",
+        pipeSinkRetryLocallyForConnectionError);
   }
 
   public int 
getPipeSubtaskExecutorBasicCheckPointIntervalByConsumedEventCount() {
@@ -2013,7 +2026,7 @@ public class CommonConfig {
         "rateLimiterHotReloadCheckIntervalMs is set to {}", 
rateLimiterHotReloadCheckIntervalMs);
   }
 
-  public int getPipeConnectorRequestSliceThresholdBytes() {
+  public int getPipeSinkRequestSliceThresholdBytes() {
     return pipeConnectorRequestSliceThresholdBytes;
   }
 
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeAbstractSinkSubtask.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeAbstractSinkSubtask.java
index 2c66adf7d91..fc02d253280 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeAbstractSinkSubtask.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeAbstractSinkSubtask.java
@@ -134,6 +134,12 @@ public abstract class PipeAbstractSinkSubtask extends 
PipeReportableSubtask {
           // return if the pipe task should be stopped
           return;
         }
+        if 
(PipeConfig.getInstance().isPipeSinkRetryLocallyForConnectionError()) {
+          super.onFailure(
+              new PipeRuntimeSinkNonReportTimeConfigurableException(
+                  throwable.getMessage(), Long.MAX_VALUE));
+          return;
+        }
       }
 
       // Handle exceptions if any available clients exist
@@ -144,9 +150,10 @@ public abstract class PipeAbstractSinkSubtask extends 
PipeReportableSubtask {
         super.onFailure(throwable);
       } else {
         // Print stack trace for better debugging
-        LOGGER.warn(
-            "A non PipeRuntimeConnectorCriticalException occurred, will throw 
a PipeRuntimeConnectorCriticalException.",
-            throwable);
+        PipeLogger.log(
+            LOGGER::warn,
+            throwable,
+            "A non PipeRuntimeSinkCriticalException occurred, will throw a 
PipeRuntimeSinkCriticalException.");
         super.onFailure(new 
PipeRuntimeSinkCriticalException(throwable.getMessage()));
       }
     }
@@ -179,8 +186,7 @@ public abstract class PipeAbstractSinkSubtask extends 
PipeReportableSubtask {
             MAX_RETRY_TIMES,
             e);
         try {
-          sleepIfNoHighPriorityTask(
-              retry * 
PipeConfig.getInstance().getPipeConnectorRetryIntervalMs());
+          sleepIfNoHighPriorityTask(retry * 
PipeConfig.getInstance().getPipeSinkRetryIntervalMs());
         } catch (final InterruptedException interruptedException) {
           LOGGER.info(
               "Interrupted while sleeping, will retry to handshake with the 
target system.",
@@ -192,7 +198,9 @@ public abstract class PipeAbstractSinkSubtask extends 
PipeReportableSubtask {
 
     // Stop current pipe task directly if failed to reconnect to
     // the target system after MAX_RETRY_TIMES times
-    if (retry == MAX_RETRY_TIMES && lastEvent instanceof EnrichedEvent) {
+    if (retry == MAX_RETRY_TIMES
+        && lastEvent instanceof EnrichedEvent
+        && 
!PipeConfig.getInstance().isPipeSinkRetryLocallyForConnectionError()) {
       report(
           (EnrichedEvent) lastEvent,
           new PipeRuntimeSinkCriticalException(
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeReportableSubtask.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeReportableSubtask.java
index 8555ca85f3a..aedb251b53a 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeReportableSubtask.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeReportableSubtask.java
@@ -21,9 +21,11 @@ package org.apache.iotdb.commons.pipe.agent.task.subtask;
 
 import org.apache.iotdb.commons.exception.pipe.PipeRuntimeCriticalException;
 import org.apache.iotdb.commons.exception.pipe.PipeRuntimeException;
+import 
org.apache.iotdb.commons.exception.pipe.PipeRuntimeSinkNonReportTimeConfigurableException;
 import 
org.apache.iotdb.commons.exception.pipe.PipeRuntimeSinkRetryTimesConfigurableException;
 import org.apache.iotdb.commons.pipe.config.PipeConfig;
 import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
+import org.apache.iotdb.commons.pipe.resource.log.PipeLogger;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -49,10 +51,11 @@ public abstract class PipeReportableSubtask extends 
PipeSubtask {
       return;
     }
 
-    if (lastEvent instanceof EnrichedEvent) {
-      onEnrichedEventFailure(throwable);
+    if (lastEvent instanceof EnrichedEvent
+        && !(throwable instanceof 
PipeRuntimeSinkNonReportTimeConfigurableException)) {
+      onReportEventFailure(throwable);
     } else {
-      onNonEnrichedEventFailure(throwable);
+      onNonReportEventFailure(throwable);
     }
 
     // Although the pipe task will be stopped, we still don't release the last 
event here
@@ -61,7 +64,7 @@ public abstract class PipeReportableSubtask extends 
PipeSubtask {
     // is dropped or the process is running normally.
   }
 
-  private void onEnrichedEventFailure(final Throwable throwable) {
+  private void onReportEventFailure(final Throwable throwable) {
     final int maxRetryTimes =
         throwable instanceof PipeRuntimeSinkRetryTimesConfigurableException
             ? ((PipeRuntimeSinkRetryTimesConfigurableException) 
throwable).getRetryTimes()
@@ -80,15 +83,16 @@ public abstract class PipeReportableSubtask extends 
PipeSubtask {
 
     retryCount.incrementAndGet();
     if (retryCount.get() <= maxRetryTimes) {
-      LOGGER.warn(
+      PipeLogger.log(
+          LOGGER::warn,
+          throwable,
           "Retry executing subtask {} (creation time: {}, simple class: {}), 
retry count [{}/{}], last exception: {}",
           taskID,
           creationTime,
           this.getClass().getSimpleName(),
           retryCount.get(),
           maxRetryTimes,
-          throwable.getMessage(),
-          throwable);
+          throwable.getMessage());
       try {
         sleepIfNoHighPriorityTask(
             retryCount.get() * 
PipeConfig.getInstance().getPipeConnectorRetryIntervalMs());
@@ -136,7 +140,7 @@ public abstract class PipeReportableSubtask extends 
PipeSubtask {
 
   protected abstract void report(final EnrichedEvent event, final 
PipeRuntimeException exception);
 
-  private void onNonEnrichedEventFailure(final Throwable throwable) {
+  private void onNonReportEventFailure(final Throwable throwable) {
     if (retryCount.get() == 0) {
       LOGGER.warn(
           "Failed to execute subtask {} (creation time: {}, simple class: {}), 
"
@@ -149,7 +153,8 @@ public abstract class PipeReportableSubtask extends 
PipeSubtask {
     }
 
     retryCount.incrementAndGet();
-    LOGGER.warn(
+    PipeLogger.log(
+        LOGGER::warn,
         "Retry executing subtask {} (creation time: {}, simple class: {}), 
retry count {}, last exception: {}",
         taskID,
         creationTime,
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
index a815800e23f..d51406acb53 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
@@ -103,7 +103,7 @@ public class PipeConfig {
     return COMMON_CONFIG.getPipeMinimumReceiverMemory();
   }
 
-  /////////////////////////////// Subtask Connector 
///////////////////////////////
+  /////////////////////////////// Subtask Sink ///////////////////////////////
 
   public int getPipeRealTimeQueuePollTsFileThreshold() {
     return COMMON_CONFIG.getPipeRealTimeQueuePollTsFileThreshold();
@@ -165,30 +165,34 @@ public class PipeConfig {
     return COMMON_CONFIG.getPipeSourceMatcherCacheSize();
   }
 
-  /////////////////////////////// Connector ///////////////////////////////
+  /////////////////////////////// Sink ///////////////////////////////
 
   public int getPipeSinkHandshakeTimeoutMs() {
     return COMMON_CONFIG.getPipeSinkHandshakeTimeoutMs();
   }
 
-  public int getPipeConnectorTransferTimeoutMs() {
-    return COMMON_CONFIG.getPipeConnectorTransferTimeoutMs();
+  public int getPipeSinkTransferTimeoutMs() {
+    return COMMON_CONFIG.getPipeSinkTransferTimeoutMs();
   }
 
-  public int getPipeConnectorReadFileBufferSize() {
-    return COMMON_CONFIG.getPipeConnectorReadFileBufferSize();
+  public int getPipeSinkReadFileBufferSize() {
+    return COMMON_CONFIG.getPipeSinkReadFileBufferSize();
   }
 
-  public boolean isPipeConnectorReadFileBufferMemoryControlEnabled() {
-    return COMMON_CONFIG.isPipeConnectorReadFileBufferMemoryControlEnabled();
+  public boolean isPipeSinkReadFileBufferMemoryControlEnabled() {
+    return COMMON_CONFIG.isPipeSinkReadFileBufferMemoryControlEnabled();
   }
 
-  public long getPipeConnectorRetryIntervalMs() {
-    return COMMON_CONFIG.getPipeConnectorRetryIntervalMs();
+  public long getPipeSinkRetryIntervalMs() {
+    return COMMON_CONFIG.getPipeSinkRetryIntervalMs();
   }
 
-  public boolean isPipeConnectorRPCThriftCompressionEnabled() {
-    return COMMON_CONFIG.isPipeConnectorRPCThriftCompressionEnabled();
+  public boolean isPipeSinkRetryLocallyForConnectionError() {
+    return COMMON_CONFIG.isPipeSinkRetryLocallyForConnectionError();
+  }
+
+  public boolean isPipeSinkRPCThriftCompressionEnabled() {
+    return COMMON_CONFIG.isPipeSinkRPCThriftCompressionEnabled();
   }
 
   public int getPipeAsyncSinkForcedRetryTsFileEventQueueSize() {
@@ -203,27 +207,27 @@ public class PipeConfig {
     return COMMON_CONFIG.getPipeAsyncSinkForcedRetryTotalEventQueueSize();
   }
 
-  public long getPipeAsyncConnectorMaxRetryExecutionTimeMsPerCall() {
-    return COMMON_CONFIG.getPipeAsyncConnectorMaxRetryExecutionTimeMsPerCall();
+  public long getPipeAsyncSinkMaxRetryExecutionTimeMsPerCall() {
+    return COMMON_CONFIG.getPipeAsyncSinkMaxRetryExecutionTimeMsPerCall();
   }
 
-  public int getPipeAsyncConnectorSelectorNumber() {
-    return COMMON_CONFIG.getPipeAsyncConnectorSelectorNumber();
+  public int getPipeAsyncSinkSelectorNumber() {
+    return COMMON_CONFIG.getPipeAsyncSinkSelectorNumber();
   }
 
-  public int getPipeAsyncConnectorMaxClientNumber() {
-    return COMMON_CONFIG.getPipeAsyncConnectorMaxClientNumber();
+  public int getPipeAsyncSinkMaxClientNumber() {
+    return COMMON_CONFIG.getPipeAsyncSinkMaxClientNumber();
   }
 
-  public int getPipeAsyncConnectorMaxTsFileClientNumber() {
-    return COMMON_CONFIG.getPipeAsyncConnectorMaxTsFileClientNumber();
+  public int getPipeAsyncSinkMaxTsFileClientNumber() {
+    return COMMON_CONFIG.getPipeAsyncSinkMaxTsFileClientNumber();
   }
 
   public double getPipeSendTsFileRateLimitBytesPerSecond() {
     return COMMON_CONFIG.getPipeSendTsFileRateLimitBytesPerSecond();
   }
 
-  public double getPipeAllConnectorsRateLimitBytesPerSecond() {
+  public double getPipeAllSinksRateLimitBytesPerSecond() {
     return COMMON_CONFIG.getPipeAllSinksRateLimitBytesPerSecond();
   }
 
@@ -231,8 +235,8 @@ public class PipeConfig {
     return COMMON_CONFIG.getRateLimiterHotReloadCheckIntervalMs();
   }
 
-  public int getPipeConnectorRequestSliceThresholdBytes() {
-    return COMMON_CONFIG.getPipeConnectorRequestSliceThresholdBytes();
+  public int getPipeSinkRequestSliceThresholdBytes() {
+    return COMMON_CONFIG.getPipeSinkRequestSliceThresholdBytes();
   }
 
   public long getPipeMaxReaderChunkSize() {
@@ -503,16 +507,14 @@ public class PipeConfig {
         getPipeSourceAssignerDisruptorRingBufferEntrySizeInBytes());
     LOGGER.info("PipeSourceMatcherCacheSize: {}", 
getPipeSourceMatcherCacheSize());
 
-    LOGGER.info("PipeConnectorHandshakeTimeoutMs: {}", 
getPipeSinkHandshakeTimeoutMs());
-    LOGGER.info("PipeConnectorTransferTimeoutMs: {}", 
getPipeConnectorTransferTimeoutMs());
-    LOGGER.info("PipeConnectorReadFileBufferSize: {}", 
getPipeConnectorReadFileBufferSize());
-    LOGGER.info(
-        "PipeConnectorReadFileBufferMemoryControlEnabled: {}",
-        isPipeConnectorReadFileBufferMemoryControlEnabled());
-    LOGGER.info("PipeConnectorRetryIntervalMs: {}", 
getPipeConnectorRetryIntervalMs());
+    LOGGER.info("PipeSinkHandshakeTimeoutMs: {}", 
getPipeSinkHandshakeTimeoutMs());
+    LOGGER.info("PipeSinkTransferTimeoutMs: {}", 
getPipeSinkTransferTimeoutMs());
+    LOGGER.info("PipeSinkReadFileBufferSize: {}", 
getPipeSinkReadFileBufferSize());
     LOGGER.info(
-        "PipeConnectorRPCThriftCompressionEnabled: {}",
-        isPipeConnectorRPCThriftCompressionEnabled());
+        "PipeSinkReadFileBufferMemoryControlEnabled: {}",
+        isPipeSinkReadFileBufferMemoryControlEnabled());
+    LOGGER.info("PipeSinkRetryIntervalMs: {}", getPipeSinkRetryIntervalMs());
+    LOGGER.info("PipeSinkRPCThriftCompressionEnabled: {}", 
isPipeSinkRPCThriftCompressionEnabled());
     LOGGER.info(
         "PipeLeaderCacheMemoryUsagePercentage: {}", 
getPipeLeaderCacheMemoryUsagePercentage());
     LOGGER.info("PipeReaderChunkSize: {}", getPipeMaxReaderChunkSize());
@@ -559,25 +561,20 @@ public class PipeConfig {
         "PipeAsyncSinkForcedRetryTotalEventQueueSize: {}",
         getPipeAsyncSinkForcedRetryTotalEventQueueSize());
     LOGGER.info(
-        "PipeAsyncConnectorMaxRetryExecutionTimeMsPerCall: {}",
-        getPipeAsyncConnectorMaxRetryExecutionTimeMsPerCall());
-    LOGGER.info("PipeAsyncConnectorSelectorNumber: {}", 
getPipeAsyncConnectorSelectorNumber());
-    LOGGER.info("PipeAsyncConnectorMaxClientNumber: {}", 
getPipeAsyncConnectorMaxClientNumber());
-    LOGGER.info(
-        "PipeAsyncConnectorMaxTsFileClientNumber: {}",
-        getPipeAsyncConnectorMaxTsFileClientNumber());
+        "PipeAsyncSinkMaxRetryExecutionTimeMsPerCall: {}",
+        getPipeAsyncSinkMaxRetryExecutionTimeMsPerCall());
+    LOGGER.info("PipeAsyncSinkSelectorNumber: {}", 
getPipeAsyncSinkSelectorNumber());
+    LOGGER.info("PipeAsyncSinkMaxClientNumber: {}", 
getPipeAsyncSinkMaxClientNumber());
+    LOGGER.info("PipeAsyncSinkMaxTsFileClientNumber: {}", 
getPipeAsyncSinkMaxTsFileClientNumber());
 
     LOGGER.info(
         "PipeSendTsFileRateLimitBytesPerSecond: {}", 
getPipeSendTsFileRateLimitBytesPerSecond());
     LOGGER.info(
-        "PipeAllConnectorsRateLimitBytesPerSecond: {}",
-        getPipeAllConnectorsRateLimitBytesPerSecond());
+        "PipeAllSinksRateLimitBytesPerSecond: {}", 
getPipeAllSinksRateLimitBytesPerSecond());
     LOGGER.info(
         "RateLimiterHotReloadCheckIntervalMs: {}", 
getRateLimiterHotReloadCheckIntervalMs());
 
-    LOGGER.info(
-        "PipeConnectorRequestSliceThresholdBytes: {}",
-        getPipeConnectorRequestSliceThresholdBytes());
+    LOGGER.info("PipeSinkRequestSliceThresholdBytes: {}", 
getPipeSinkRequestSliceThresholdBytes());
 
     LOGGER.info("SeperatedPipeHeartbeatEnabled: {}", 
isSeperatedPipeHeartbeatEnabled());
     LOGGER.info(
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
index 5e11df2d086..91dfd11e71f 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
@@ -341,35 +341,42 @@ public class PipeDescriptor {
                     properties.getProperty(
                         "pipe_connector_handshake_timeout_ms",
                         
String.valueOf(config.getPipeSinkHandshakeTimeoutMs())))));
-    config.setPipeConnectorReadFileBufferSize(
+    config.setPipeSinkReadFileBufferSize(
         Integer.parseInt(
             
Optional.ofNullable(properties.getProperty("pipe_sink_read_file_buffer_size"))
                 .orElse(
                     properties.getProperty(
                         "pipe_connector_read_file_buffer_size",
-                        
String.valueOf(config.getPipeConnectorReadFileBufferSize())))));
-    config.setIsPipeConnectorReadFileBufferMemoryControlEnabled(
+                        
String.valueOf(config.getPipeSinkReadFileBufferSize())))));
+    config.setIsPipeSinkReadFileBufferMemoryControlEnabled(
         Boolean.parseBoolean(
             
Optional.ofNullable(properties.getProperty("pipe_sink_read_file_buffer_memory_control"))
                 .orElse(
                     properties.getProperty(
                         "pipe_connector_read_file_buffer_memory_control",
-                        String.valueOf(
-                            
config.isPipeConnectorReadFileBufferMemoryControlEnabled())))));
-    config.setPipeConnectorRetryIntervalMs(
+                        
String.valueOf(config.isPipeSinkReadFileBufferMemoryControlEnabled())))));
+    config.setPipeSinkRetryIntervalMs(
         Long.parseLong(
             
Optional.ofNullable(properties.getProperty("pipe_sink_retry_interval_ms"))
                 .orElse(
                     properties.getProperty(
                         "pipe_connector_retry_interval_ms",
-                        
String.valueOf(config.getPipeConnectorRetryIntervalMs())))));
-    config.setPipeConnectorRPCThriftCompressionEnabled(
+                        
String.valueOf(config.getPipeSinkRetryIntervalMs())))));
+    config.setPipeSinkRetryLocallyForConnectionError(
+        Boolean.parseBoolean(
+            Optional.ofNullable(
+                    
properties.getProperty("pipe_sink_retry_locally_for_connection_error"))
+                .orElse(
+                    properties.getProperty(
+                        "pipe_connector_retry_locally_for_connection_error",
+                        
String.valueOf(config.isPipeSinkRetryLocallyForConnectionError())))));
+    config.setPipeSinkRPCThriftCompressionEnabled(
         Boolean.parseBoolean(
             
Optional.ofNullable(properties.getProperty("pipe_sink_rpc_thrift_compression_enabled"))
                 .orElse(
                     properties.getProperty(
                         "pipe_connector_rpc_thrift_compression_enabled",
-                        
String.valueOf(config.isPipeConnectorRPCThriftCompressionEnabled())))));
+                        
String.valueOf(config.isPipeSinkRPCThriftCompressionEnabled())))));
     config.setPipeAsyncConnectorMaxRetryExecutionTimeMsPerCall(
         Long.parseLong(
             Optional.ofNullable(
@@ -377,8 +384,7 @@ public class PipeDescriptor {
                 .orElse(
                     properties.getProperty(
                         
"pipe_async_connector_max_retry_execution_time_ms_per_call",
-                        String.valueOf(
-                            
config.getPipeAsyncConnectorMaxRetryExecutionTimeMsPerCall())))));
+                        
String.valueOf(config.getPipeAsyncSinkMaxRetryExecutionTimeMsPerCall())))));
     config.setPipeAsyncSinkForcedRetryTsFileEventQueueSize(
         Integer.parseInt(
             Optional.ofNullable(
@@ -415,7 +421,7 @@ public class PipeDescriptor {
         Integer.parseInt(
             properties.getProperty(
                 "pipe_connector_request_slice_threshold_bytes",
-                
String.valueOf(config.getPipeConnectorRequestSliceThresholdBytes()))));
+                
String.valueOf(config.getPipeSinkRequestSliceThresholdBytes()))));
 
     config.setPipeReceiverLoginPeriodicVerificationIntervalMs(
         Long.parseLong(
@@ -566,7 +572,7 @@ public class PipeDescriptor {
         parserPipeConfig(
             properties, "pipe_sink_timeout_ms", "pipe_connector_timeout_ms", 
isHotModify);
     if (value != null) {
-      config.setPipeConnectorTransferTimeoutMs(Long.parseLong(value));
+      config.setPipeSinkTransferTimeoutMs(Long.parseLong(value));
     }
 
     value =
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/client/IoTDBClientManager.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/client/IoTDBClientManager.java
index 118f2f26876..917d4a74ed9 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/client/IoTDBClientManager.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/client/IoTDBClientManager.java
@@ -56,7 +56,7 @@ public abstract class IoTDBClientManager {
   private static final int MAX_CONNECTION_TIMEOUT_MS = 24 * 60 * 60 * 1000; // 
1 day
   private static final int FIRST_ADJUSTMENT_TIMEOUT_MS = 6 * 60 * 60 * 1000; 
// 6 hours
   protected static final AtomicInteger CONNECTION_TIMEOUT_MS =
-      new 
AtomicInteger(PipeConfig.getInstance().getPipeConnectorTransferTimeoutMs());
+      new 
AtomicInteger(PipeConfig.getInstance().getPipeSinkTransferTimeoutMs());
 
   protected IoTDBClientManager(
       List<TEndPoint> endPointList,
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/client/IoTDBSyncClient.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/client/IoTDBSyncClient.java
index a0bb65b5a7e..b7f42295e6c 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/client/IoTDBSyncClient.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/client/IoTDBSyncClient.java
@@ -100,7 +100,7 @@ public class IoTDBSyncClient extends 
IClientRPCService.Client
 
   @Override
   public TPipeTransferResp pipeTransfer(final TPipeTransferReq req) throws 
TException {
-    final int bodySizeLimit = 
PipeConfig.getInstance().getPipeConnectorRequestSliceThresholdBytes();
+    final int bodySizeLimit = 
PipeConfig.getInstance().getPipeSinkRequestSliceThresholdBytes();
     if (req.getVersion() != IoTDBSinkRequestVersion.VERSION_1.getVersion()
         || req.body.limit() < bodySizeLimit) {
       return super.pipeTransfer(req);
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/client/IoTDBSyncClientManager.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/client/IoTDBSyncClientManager.java
index fa5f0d3383c..969f42bfcec 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/client/IoTDBSyncClientManager.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/client/IoTDBSyncClientManager.java
@@ -195,7 +195,7 @@ public abstract class IoTDBSyncClientManager extends 
IoTDBClientManager implemen
               new ThriftClientProperty.Builder()
                   
.setConnectionTimeoutMs(PIPE_CONFIG.getPipeSinkHandshakeTimeoutMs())
                   .setRpcThriftCompressionEnabled(
-                      PIPE_CONFIG.isPipeConnectorRPCThriftCompressionEnabled())
+                      PIPE_CONFIG.isPipeSinkRPCThriftCompressionEnabled())
                   .build(),
               endPoint.getIp(),
               endPoint.getPort(),
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/limiter/GlobalRPCRateLimiter.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/limiter/GlobalRPCRateLimiter.java
index 9a6aba5b90b..e497656eab2 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/limiter/GlobalRPCRateLimiter.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/limiter/GlobalRPCRateLimiter.java
@@ -28,6 +28,6 @@ public class GlobalRPCRateLimiter extends GlobalRateLimiter {
 
   @Override
   protected double getThroughputBytesPerSecond() {
-    return CONFIG.getPipeAllConnectorsRateLimitBytesPerSecond();
+    return CONFIG.getPipeAllSinksRateLimitBytesPerSecond();
   }
 }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/IoTDBAirGapSink.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/IoTDBAirGapSink.java
index dda8818e04f..763bfa644ea 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/IoTDBAirGapSink.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/IoTDBAirGapSink.java
@@ -238,7 +238,7 @@ public abstract class IoTDBAirGapSink extends IoTDBSink {
     } else {
       supportModsIfIsDataNodeReceiver = true;
     }
-    socket.setSoTimeout(PIPE_CONFIG.getPipeConnectorTransferTimeoutMs());
+    socket.setSoTimeout(PIPE_CONFIG.getPipeSinkTransferTimeoutMs());
     LOGGER.info("Handshake success. Socket: {}", socket);
   }
 
@@ -265,7 +265,7 @@ public abstract class IoTDBAirGapSink extends IoTDBSink {
       final AirGapSocket socket,
       final boolean isMultiFile)
       throws PipeException, IOException {
-    final int readFileBufferSize = 
PipeConfig.getInstance().getPipeConnectorReadFileBufferSize();
+    final int readFileBufferSize = 
PipeConfig.getInstance().getPipeSinkReadFileBufferSize();
     final byte[] readBuffer = new byte[readFileBufferSize];
     long position = 0;
     try (final RandomAccessFile reader = new RandomAccessFile(file, "r")) {
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/IoTDBSink.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/IoTDBSink.java
index 0668f7cc99b..66f3eda6a1a 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/IoTDBSink.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/IoTDBSink.java
@@ -378,7 +378,7 @@ public abstract class IoTDBSink implements PipeConnector {
 
     nodeUrls.clear();
     nodeUrls.addAll(parseNodeUrls(parameters));
-    LOGGER.info("IoTDBConnector nodeUrls: {}", nodeUrls);
+    LOGGER.info("IoTDBSink nodeUrls: {}", nodeUrls);
 
     isTabletBatchModeEnabled =
         parameters.getBooleanOrDefault(
@@ -390,7 +390,7 @@ public abstract class IoTDBSink implements PipeConnector {
                     Arrays.asList(CONNECTOR_FORMAT_KEY, SINK_FORMAT_KEY),
                     CONNECTOR_FORMAT_HYBRID_VALUE)
                 .equals(CONNECTOR_FORMAT_TS_FILE_VALUE);
-    LOGGER.info("IoTDBConnector isTabletBatchModeEnabled: {}", 
isTabletBatchModeEnabled);
+    LOGGER.info("IoTDBSink isTabletBatchModeEnabled: {}", 
isTabletBatchModeEnabled);
 
     final boolean shouldMarkAsGeneralWriteRequest =
         parameters.getBooleanOrDefault(
@@ -406,7 +406,7 @@ public abstract class IoTDBSink implements PipeConnector {
               Arrays.asList(CONNECTOR_MARK_AS_PIPE_REQUEST_KEY, 
SINK_MARK_AS_PIPE_REQUEST_KEY),
               CONNECTOR_MARK_AS_PIPE_REQUEST_DEFAULT_VALUE);
     }
-    LOGGER.info("IoTDBConnector shouldMarkAsPipeRequest: {}", 
shouldMarkAsPipeRequest);
+    LOGGER.info("IoTDBSink shouldMarkAsPipeRequest: {}", 
shouldMarkAsPipeRequest);
 
     final String connectorSkipIfValue =
         parameters
@@ -425,7 +425,7 @@ public abstract class IoTDBSink implements PipeConnector {
       throw new PipeParameterNotValidException(
           String.format("Parameters in set %s are not allowed in 'skipif'", 
skipIfOptionSet));
     }
-    LOGGER.info("IoTDBConnector skipIfNoPrivileges: {}", skipIfNoPrivileges);
+    LOGGER.info("IoTDBSink skipIfNoPrivileges: {}", skipIfNoPrivileges);
 
     receiverStatusHandler =
         new PipeReceiverStatusHandler(
@@ -465,7 +465,7 @@ public abstract class IoTDBSink implements PipeConnector {
                 SINK_EXCEPTION_DATA_CONVERT_ON_TYPE_MISMATCH_KEY),
             CONNECTOR_EXCEPTION_DATA_CONVERT_ON_TYPE_MISMATCH_DEFAULT_VALUE);
     LOGGER.info(
-        "IoTDBConnector {} = {}",
+        "IoTDBSink {} = {}",
         CONNECTOR_EXCEPTION_DATA_CONVERT_ON_TYPE_MISMATCH_KEY,
         shouldReceiverConvertOnTypeMismatch);
     isRealtimeFirst =
@@ -475,7 +475,7 @@ public abstract class IoTDBSink implements PipeConnector {
                 PipeSinkConstant.SINK_REALTIME_FIRST_KEY),
             PipeSinkConstant.CONNECTOR_REALTIME_FIRST_DEFAULT_VALUE);
     LOGGER.info(
-        "IoTDBConnector {} = {}", 
PipeSinkConstant.CONNECTOR_REALTIME_FIRST_KEY, isRealtimeFirst);
+        "IoTDBSink {} = {}", PipeSinkConstant.CONNECTOR_REALTIME_FIRST_KEY, 
isRealtimeFirst);
   }
 
   protected LinkedHashSet<TEndPoint> parseNodeUrls(final PipeParameters 
parameters)
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/IoTDBSslSyncSink.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/IoTDBSslSyncSink.java
index 080efd25846..96b7346d025 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/IoTDBSslSyncSink.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/IoTDBSslSyncSink.java
@@ -175,7 +175,7 @@ public abstract class IoTDBSslSyncSink extends IoTDBSink {
       final Pair<IoTDBSyncClient, Boolean> clientAndStatus,
       final boolean isMultiFile)
       throws PipeException, IOException {
-    final int readFileBufferSize = 
PipeConfig.getInstance().getPipeConnectorReadFileBufferSize();
+    final int readFileBufferSize = 
PipeConfig.getInstance().getPipeSinkReadFileBufferSize();
     final byte[] readBuffer = new byte[readFileBufferSize];
     long position = 0;
     try (final RandomAccessFile reader = new RandomAccessFile(file, "r")) {

Reply via email to