This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 1b0a9d76883 Pipe: Enabled locally retry for PipeConnectionException 
(#17182)
1b0a9d76883 is described below

commit 1b0a9d7688362fddd80c03cf822e19983cefeea7
Author: Caideyipi <[email protected]>
AuthorDate: Mon Feb 9 14:28:17 2026 +0800

    Pipe: Enabled locally retry for PipeConnectionException (#17182)
    
    * fix
    
    * may-fix
    
    * fix
    
    * fix
    
    * try-complete
    
    * fix-part
    
    * Update IoTDBDataRegionAsyncSink.java
    
    * fix
    
    * fix
---
 .../iotdb/tool/tsfile/ImportTsFileRemotely.java    |   9 +-
 .../exchange/sender/TwoStageAggregateSender.java   |   5 +-
 .../protocol/airgap/IoTDBAirGapReceiver.java       |   2 +-
 .../client/IoTDBDataNodeAsyncClientManager.java    |   4 +-
 .../sink/protocol/legacy/IoTDBLegacyPipeSink.java  |   2 +-
 .../pipeconsensus/PipeConsensusSyncSink.java       |   2 +-
 .../PipeConsensusTsFileInsertionEventHandler.java  |   2 +-
 .../thrift/async/IoTDBDataRegionAsyncSink.java     |  37 +++---
 .../PipeTransferTabletBatchEventHandler.java       |   2 +-
 .../PipeTransferTabletInsertionEventHandler.java   |   2 +-
 .../async/handler/PipeTransferTsFileHandler.java   |   8 +-
 .../GeneralRegionAttributeSecurityService.java     |   4 +-
 .../iotdb/commons/client/ClientPoolFactory.java    |  20 ++--
 .../apache/iotdb/commons/conf/CommonConfig.java    | 129 ++++++++++++---------
 .../task/subtask/PipeAbstractSinkSubtask.java      |  20 +++-
 .../agent/task/subtask/PipeReportableSubtask.java  |  23 ++--
 .../iotdb/commons/pipe/config/PipeConfig.java      |  95 ++++++++-------
 .../iotdb/commons/pipe/config/PipeDescriptor.java  |  36 +++---
 .../pipe/sink/client/IoTDBClientManager.java       |   2 +-
 .../commons/pipe/sink/client/IoTDBSyncClient.java  |   2 +-
 .../pipe/sink/client/IoTDBSyncClientManager.java   |   4 +-
 .../pipe/sink/limiter/GlobalRPCRateLimiter.java    |   2 +-
 .../pipe/sink/protocol/IoTDBAirGapSink.java        |   4 +-
 .../commons/pipe/sink/protocol/IoTDBSink.java      |  12 +-
 .../pipe/sink/protocol/IoTDBSslSyncSink.java       |   2 +-
 25 files changed, 230 insertions(+), 200 deletions(-)

diff --git 
a/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/tsfile/ImportTsFileRemotely.java
 
b/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/tsfile/ImportTsFileRemotely.java
index efb7f2abdbf..aca33ba4dbf 100644
--- 
a/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/tsfile/ImportTsFileRemotely.java
+++ 
b/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/tsfile/ImportTsFileRemotely.java
@@ -164,7 +164,7 @@ public class ImportTsFileRemotely extends ImportTsFileBase {
                 "Handshake error with target server ip: %s, port: %s, because: 
%s.",
                 client.getIpAddress(), client.getPort(), resp.getStatus()));
       } else {
-        
client.setTimeout(PipeConfig.getInstance().getPipeConnectorTransferTimeoutMs());
+        
client.setTimeout(PipeConfig.getInstance().getPipeSinkTransferTimeoutMs());
         IOT_PRINTER.println(
             String.format(
                 "Handshake success. Target server ip: %s, port: %s",
@@ -232,7 +232,7 @@ public class ImportTsFileRemotely extends ImportTsFileBase {
 
   private void transferFilePieces(final File file, final boolean isMultiFile)
       throws PipeException, IOException {
-    final int readFileBufferSize = 
PipeConfig.getInstance().getPipeConnectorReadFileBufferSize();
+    final int readFileBufferSize = 
PipeConfig.getInstance().getPipeSinkReadFileBufferSize();
     final byte[] readBuffer = new byte[readFileBufferSize];
     long position = 0;
     try (final RandomAccessFile reader = new RandomAccessFile(file, "r")) {
@@ -299,10 +299,9 @@ public class ImportTsFileRemotely extends ImportTsFileBase 
{
       this.client =
           new IoTDBSyncClient(
               new ThriftClientProperty.Builder()
-                  .setConnectionTimeoutMs(
-                      
PipeConfig.getInstance().getPipeConnectorHandshakeTimeoutMs())
+                  
.setConnectionTimeoutMs(PipeConfig.getInstance().getPipeSinkHandshakeTimeoutMs())
                   .setRpcThriftCompressionEnabled(
-                      
PipeConfig.getInstance().isPipeConnectorRPCThriftCompressionEnabled())
+                      
PipeConfig.getInstance().isPipeSinkRPCThriftCompressionEnabled())
                   .build(),
               getEndPoint().getIp(),
               getEndPoint().getPort(),
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/exchange/sender/TwoStageAggregateSender.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/exchange/sender/TwoStageAggregateSender.java
index bac357368c0..3c36559a300 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/exchange/sender/TwoStageAggregateSender.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/exchange/sender/TwoStageAggregateSender.java
@@ -208,9 +208,8 @@ public class TwoStageAggregateSender implements 
AutoCloseable {
   private IoTDBSyncClient constructIoTDBSyncClient(TEndPoint endPoint) throws 
TTransportException {
     return new IoTDBSyncClient(
         new ThriftClientProperty.Builder()
-            
.setConnectionTimeoutMs(PIPE_CONFIG.getPipeConnectorHandshakeTimeoutMs())
-            .setRpcThriftCompressionEnabled(
-                PIPE_CONFIG.isPipeConnectorRPCThriftCompressionEnabled())
+            
.setConnectionTimeoutMs(PIPE_CONFIG.getPipeSinkHandshakeTimeoutMs())
+            
.setRpcThriftCompressionEnabled(PIPE_CONFIG.isPipeSinkRPCThriftCompressionEnabled())
             .build(),
         endPoint.getIp(),
         endPoint.getPort(),
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/airgap/IoTDBAirGapReceiver.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/airgap/IoTDBAirGapReceiver.java
index 3cea6c998f8..610b9e5fe1a 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/airgap/IoTDBAirGapReceiver.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/airgap/IoTDBAirGapReceiver.java
@@ -70,7 +70,7 @@ public class IoTDBAirGapReceiver extends WrappedRunnable {
 
   @Override
   public void runMayThrow() throws Throwable {
-    
socket.setSoTimeout(PipeConfig.getInstance().getPipeConnectorTransferTimeoutMs());
+    
socket.setSoTimeout(PipeConfig.getInstance().getPipeSinkTransferTimeoutMs());
     socket.setKeepAlive(true);
 
     LOGGER.info("Pipe air gap receiver {} started. Socket: {}", receiverId, 
socket);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/client/IoTDBDataNodeAsyncClientManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/client/IoTDBDataNodeAsyncClientManager.java
index 5ee7f22bc07..73e2213eea6 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/client/IoTDBDataNodeAsyncClientManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/client/IoTDBDataNodeAsyncClientManager.java
@@ -312,7 +312,7 @@ public class IoTDBDataNodeAsyncClientManager extends 
IoTDBClientManager
           PipeTransferHandshakeConstant.HANDSHAKE_KEY_SKIP_IF,
           Boolean.toString(skipIfNoPrivileges));
 
-      
client.setTimeoutDynamically(PipeConfig.getInstance().getPipeConnectorHandshakeTimeoutMs());
+      
client.setTimeoutDynamically(PipeConfig.getInstance().getPipeSinkHandshakeTimeoutMs());
       
client.pipeTransfer(PipeTransferDataNodeHandshakeV2Req.toTPipeTransferReq(params),
 callback);
       waitHandshakeFinished(isHandshakeFinished);
 
@@ -331,7 +331,7 @@ public class IoTDBDataNodeAsyncClientManager extends 
IoTDBClientManager
         resp.set(null);
         exception.set(null);
 
-        
client.setTimeoutDynamically(PipeConfig.getInstance().getPipeConnectorHandshakeTimeoutMs());
+        
client.setTimeoutDynamically(PipeConfig.getInstance().getPipeSinkHandshakeTimeoutMs());
         client.pipeTransfer(
             PipeTransferDataNodeHandshakeV1Req.toTPipeTransferReq(
                 
CommonDescriptor.getInstance().getConfig().getTimestampPrecision()),
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/legacy/IoTDBLegacyPipeSink.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/legacy/IoTDBLegacyPipeSink.java
index 332ca6bab7c..e296a7e0faa 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/legacy/IoTDBLegacyPipeSink.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/legacy/IoTDBLegacyPipeSink.java
@@ -389,7 +389,7 @@ public class IoTDBLegacyPipeSink implements PipeConnector {
     long position = 0;
 
     // Try small piece to rebase the file position.
-    final byte[] buffer = new 
byte[PipeConfig.getInstance().getPipeConnectorReadFileBufferSize()];
+    final byte[] buffer = new 
byte[PipeConfig.getInstance().getPipeSinkReadFileBufferSize()];
     try (final RandomAccessFile randomAccessFile = new RandomAccessFile(file, 
"r")) {
       while (true) {
         final int dataLength = randomAccessFile.read(buffer);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/pipeconsensus/PipeConsensusSyncSink.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/pipeconsensus/PipeConsensusSyncSink.java
index bb0dcf2dc09..50972ee4454 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/pipeconsensus/PipeConsensusSyncSink.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/pipeconsensus/PipeConsensusSyncSink.java
@@ -446,7 +446,7 @@ public class PipeConsensusSyncSink extends IoTDBSink {
       final TCommitId tCommitId,
       final TConsensusGroupId tConsensusGroupId)
       throws PipeException, IOException {
-    final int readFileBufferSize = 
PipeConfig.getInstance().getPipeConnectorReadFileBufferSize();
+    final int readFileBufferSize = 
PipeConfig.getInstance().getPipeSinkReadFileBufferSize();
     final byte[] readBuffer = new byte[readFileBufferSize];
     long position = 0;
     try (final RandomAccessFile reader = new RandomAccessFile(file, "r")) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/pipeconsensus/handler/PipeConsensusTsFileInsertionEventHandler.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/pipeconsensus/handler/PipeConsensusTsFileInsertionEventHandler.java
index 485096173c1..630262d4485 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/pipeconsensus/handler/PipeConsensusTsFileInsertionEventHandler.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/pipeconsensus/handler/PipeConsensusTsFileInsertionEventHandler.java
@@ -104,7 +104,7 @@ public class PipeConsensusTsFileInsertionEventHandler
     transferMod = event.isWithMod();
     currentFile = transferMod ? modFile : tsFile;
 
-    readFileBufferSize = 
PipeConfig.getInstance().getPipeConnectorReadFileBufferSize();
+    readFileBufferSize = 
PipeConfig.getInstance().getPipeSinkReadFileBufferSize();
     readBuffer = new byte[readFileBufferSize];
     position = 0;
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java
index e6e368a5280..f8d0b104096 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.pipe.sink.protocol.thrift.async;
 
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.commons.audit.UserEntity;
+import org.apache.iotdb.commons.client.ThriftClient;
 import 
org.apache.iotdb.commons.client.async.AsyncPipeDataTransferServiceClient;
 import org.apache.iotdb.commons.pipe.config.PipeConfig;
 import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
@@ -59,6 +60,7 @@ import 
org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
 import org.apache.iotdb.pipe.api.event.Event;
 import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
 import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
+import org.apache.iotdb.pipe.api.exception.PipeConnectionException;
 import org.apache.iotdb.pipe.api.exception.PipeException;
 import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
 
@@ -127,6 +129,7 @@ public class IoTDBDataRegionAsyncSink extends IoTDBSink {
       new ConcurrentHashMap<>();
 
   private boolean enableSendTsFileLimit;
+  private volatile boolean isConnectionException;
 
   @Override
   public void validate(final PipeParameterValidator validator) throws 
Exception {
@@ -261,10 +264,10 @@ public class IoTDBDataRegionAsyncSink extends IoTDBSink {
                   false,
                   sealedFile.left));
         }
-      } catch (final Throwable t) {
-        LOGGER.warn("Failed to transfer tsfile batch ({}).", dbTsFilePairs, t);
+      } catch (final Exception e) {
+        LOGGER.warn("Failed to transfer tsfile batch ({}).", dbTsFilePairs, e);
         if (eventsHadBeenAddedToRetryQueue.compareAndSet(false, true)) {
-          addFailureEventsToRetryQueue(events);
+          addFailureEventsToRetryQueue(events, e);
         }
       }
     } else {
@@ -599,7 +602,7 @@ public class IoTDBDataRegionAsyncSink extends IoTDBSink {
 
       // Stop retrying if the execution time exceeds the threshold for better 
realtime performance
       if (System.currentTimeMillis() - retryStartTime
-          > 
PipeConfig.getInstance().getPipeAsyncConnectorMaxRetryExecutionTimeMsPerCall()) 
{
+          > 
PipeConfig.getInstance().getPipeAsyncSinkMaxRetryExecutionTimeMsPerCall()) {
         if (retryEventQueueEventCounter.getTabletInsertionEventCount()
                 < 
PipeConfig.getInstance().getPipeAsyncSinkForcedRetryTabletEventQueueSize()
             && retryEventQueueEventCounter.getTsFileInsertionEventCount()
@@ -610,14 +613,17 @@ public class IoTDBDataRegionAsyncSink extends IoTDBSink {
         }
 
         if (remainingEvents <= retryEventQueue.size() + 
retryTsFileQueue.size()) {
-          throw new PipeException(
+          final String message =
               "Failed to retry transferring events in the retry queue. 
Remaining events: "
                   + (retryEventQueue.size() + retryTsFileQueue.size())
                   + " (tablet events: "
                   + retryEventQueueEventCounter.getTabletInsertionEventCount()
                   + ", tsfile events: "
                   + retryEventQueueEventCounter.getTsFileInsertionEventCount()
-                  + ").");
+                  + ").";
+          throw isConnectionException
+              ? new PipeConnectionException(message)
+              : new PipeException(message);
         }
       }
     }
@@ -633,7 +639,7 @@ public class IoTDBDataRegionAsyncSink extends IoTDBSink {
               
.decreaseReferenceCount(IoTDBDataRegionAsyncSink.class.getName(), false);
         }
       } catch (final Exception e) {
-        addFailureEventToRetryQueue(tabletInsertionEvent);
+        addFailureEventToRetryQueue(tabletInsertionEvent, e);
       }
       return;
     }
@@ -646,14 +652,14 @@ public class IoTDBDataRegionAsyncSink extends IoTDBSink {
               
.decreaseReferenceCount(IoTDBDataRegionAsyncSink.class.getName(), false);
         }
       } else {
-        addFailureEventToRetryQueue(tabletInsertionEvent);
+        addFailureEventToRetryQueue(tabletInsertionEvent, null);
       }
     } catch (final Exception e) {
       if (tabletInsertionEvent instanceof EnrichedEvent) {
         ((EnrichedEvent) tabletInsertionEvent)
             .decreaseReferenceCount(IoTDBDataRegionAsyncSink.class.getName(), 
false);
       }
-      addFailureEventToRetryQueue(tabletInsertionEvent);
+      addFailureEventToRetryQueue(tabletInsertionEvent, e);
     }
   }
 
@@ -663,11 +669,11 @@ public class IoTDBDataRegionAsyncSink extends IoTDBSink {
         tsFileInsertionEvent.decreaseReferenceCount(
             IoTDBDataRegionAsyncSink.class.getName(), false);
       } else {
-        addFailureEventToRetryQueue(tsFileInsertionEvent);
+        addFailureEventToRetryQueue(tsFileInsertionEvent, null);
       }
     } catch (final Exception e) {
       
tsFileInsertionEvent.decreaseReferenceCount(IoTDBDataRegionAsyncSink.class.getName(),
 false);
-      addFailureEventToRetryQueue(tsFileInsertionEvent);
+      addFailureEventToRetryQueue(tsFileInsertionEvent, e);
     }
   }
 
@@ -677,7 +683,9 @@ public class IoTDBDataRegionAsyncSink extends IoTDBSink {
    * @param event {@link Event} to retry
    */
   @SuppressWarnings("java:S899")
-  public void addFailureEventToRetryQueue(final Event event) {
+  public void addFailureEventToRetryQueue(final Event event, final Exception 
e) {
+    isConnectionException =
+        e instanceof PipeConnectionException || 
ThriftClient.isConnectionBroken(e);
     if (event instanceof EnrichedEvent && ((EnrichedEvent) 
event).isReleased()) {
       return;
     }
@@ -713,8 +721,9 @@ public class IoTDBDataRegionAsyncSink extends IoTDBSink {
    *
    * @param events {@link EnrichedEvent}s to retry
    */
-  public void addFailureEventsToRetryQueue(final Iterable<EnrichedEvent> 
events) {
-    events.forEach(this::addFailureEventToRetryQueue);
+  public void addFailureEventsToRetryQueue(
+      final Iterable<EnrichedEvent> events, final Exception e) {
+    events.forEach(event -> addFailureEventToRetryQueue(event, e));
   }
 
   public boolean isEnableSendTsFileLimit() {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTabletBatchEventHandler.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTabletBatchEventHandler.java
index 89baaa02794..d95513a2228 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTabletBatchEventHandler.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTabletBatchEventHandler.java
@@ -123,7 +123,7 @@ public class PipeTransferTabletBatchEventHandler extends 
PipeTransferTrackableHa
           events.size(),
           
events.stream().map(EnrichedEvent::getPipeName).collect(Collectors.toSet()));
     } finally {
-      connector.addFailureEventsToRetryQueue(events);
+      connector.addFailureEventsToRetryQueue(events, exception);
     }
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTabletInsertionEventHandler.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTabletInsertionEventHandler.java
index df26325a5ca..ca92af92b19 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTabletInsertionEventHandler.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTabletInsertionEventHandler.java
@@ -98,7 +98,7 @@ public abstract class PipeTransferTabletInsertionEventHandler 
extends PipeTransf
           event.getCommitterKey(),
           event.getCommitId());
     } finally {
-      connector.addFailureEventToRetryQueue(event);
+      connector.addFailureEventToRetryQueue(event, exception);
     }
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTsFileHandler.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTsFileHandler.java
index 3ef46034554..6726c9a6701 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTsFileHandler.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTsFileHandler.java
@@ -126,7 +126,7 @@ public class PipeTransferTsFileHandler extends 
PipeTransferTrackableHandler {
     readFileBufferSize =
         (int)
             Math.min(
-                PipeConfig.getInstance().getPipeConnectorReadFileBufferSize(),
+                PipeConfig.getInstance().getPipeSinkReadFileBufferSize(),
                 transferMod ? Math.max(tsFile.length(), modFile.length()) : 
tsFile.length());
     position = 0;
 
@@ -146,7 +146,7 @@ public class PipeTransferTsFileHandler extends 
PipeTransferTrackableHandler {
       memoryBlock =
           PipeDataNodeResourceManager.memory()
               .forceAllocateForTsFileWithRetry(
-                  
PipeConfig.getInstance().isPipeConnectorReadFileBufferMemoryControlEnabled()
+                  
PipeConfig.getInstance().isPipeSinkReadFileBufferMemoryControlEnabled()
                       ? readFileBufferSize
                       : 0);
       readBuffer = new byte[readFileBufferSize];
@@ -415,7 +415,7 @@ public class PipeTransferTsFileHandler extends 
PipeTransferTrackableHandler {
         returnClientIfNecessary();
       } finally {
         if (eventsHadBeenAddedToRetryQueue.compareAndSet(false, true)) {
-          connector.addFailureEventsToRetryQueue(events);
+          connector.addFailureEventsToRetryQueue(events, exception);
         }
       }
     }
@@ -477,7 +477,7 @@ public class PipeTransferTsFileHandler extends 
PipeTransferTrackableHandler {
    * @param timeoutMs CAN NOT BE UNLIMITED, otherwise it may cause deadlock.
    */
   private void waitForResourceEnough4Slicing(final long timeoutMs) throws 
InterruptedException {
-    if 
(!PipeConfig.getInstance().isPipeConnectorReadFileBufferMemoryControlEnabled()) 
{
+    if 
(!PipeConfig.getInstance().isPipeSinkReadFileBufferMemoryControlEnabled()) {
       return;
     }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/attribute/update/GeneralRegionAttributeSecurityService.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/attribute/update/GeneralRegionAttributeSecurityService.java
index 1bff56500ea..408cc121590 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/attribute/update/GeneralRegionAttributeSecurityService.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/attribute/update/GeneralRegionAttributeSecurityService.java
@@ -102,9 +102,7 @@ public class GeneralRegionAttributeSecurityService extends 
AbstractPeriodicalSer
     // UpdateClearContainer and version / TEndPoint are not calculated
     final AtomicInteger limit =
         new AtomicInteger(
-            CommonDescriptor.getInstance()
-                .getConfig()
-                .getPipeConnectorRequestSliceThresholdBytes());
+            
CommonDescriptor.getInstance().getConfig().getPipeSinkRequestSliceThresholdBytes());
 
     final AtomicBoolean hasRemaining = new AtomicBoolean(false);
     final Map<SchemaRegionId, Pair<Long, Map<TDataNodeLocation, byte[]>>> 
attributeUpdateCommitMap =
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientPoolFactory.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientPoolFactory.java
index 7ed34359b21..9e41b585ec4 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientPoolFactory.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientPoolFactory.java
@@ -281,15 +281,13 @@ public class ClientPoolFactory {
               new AsyncPipeDataTransferServiceClient.Factory(
                   manager,
                   new ThriftClientProperty.Builder()
-                      
.setConnectionTimeoutMs(conf.getPipeConnectorTransferTimeoutMs())
-                      .setRpcThriftCompressionEnabled(
-                          conf.isPipeConnectorRPCThriftCompressionEnabled())
-                      .setSelectorNumOfAsyncClientManager(
-                          conf.getPipeAsyncConnectorSelectorNumber())
+                      
.setConnectionTimeoutMs(conf.getPipeSinkTransferTimeoutMs())
+                      
.setRpcThriftCompressionEnabled(conf.isPipeSinkRPCThriftCompressionEnabled())
+                      
.setSelectorNumOfAsyncClientManager(conf.getPipeAsyncSinkSelectorNumber())
                       .build(),
                   ThreadName.PIPE_ASYNC_CONNECTOR_CLIENT_POOL.getName()),
               new 
ClientPoolProperty.Builder<AsyncPipeDataTransferServiceClient>()
-                  
.setMaxClientNumForEachNode(conf.getPipeAsyncConnectorMaxClientNumber())
+                  
.setMaxClientNumForEachNode(conf.getPipeAsyncSinkMaxClientNumber())
                   .build()
                   .getConfig());
       ClientManagerMetrics.getInstance()
@@ -308,16 +306,14 @@ public class ClientPoolFactory {
               new AsyncPipeDataTransferServiceClient.Factory(
                   manager,
                   new ThriftClientProperty.Builder()
-                      
.setConnectionTimeoutMs(conf.getPipeConnectorTransferTimeoutMs())
-                      .setRpcThriftCompressionEnabled(
-                          conf.isPipeConnectorRPCThriftCompressionEnabled())
-                      .setSelectorNumOfAsyncClientManager(
-                          conf.getPipeAsyncConnectorSelectorNumber())
+                      
.setConnectionTimeoutMs(conf.getPipeSinkTransferTimeoutMs())
+                      
.setRpcThriftCompressionEnabled(conf.isPipeSinkRPCThriftCompressionEnabled())
+                      
.setSelectorNumOfAsyncClientManager(conf.getPipeAsyncSinkSelectorNumber())
                       
.setPrintLogWhenEncounterException(conf.isPrintLogWhenEncounterException())
                       .build(),
                   ThreadName.PIPE_ASYNC_CONNECTOR_CLIENT_POOL.getName()),
               new 
ClientPoolProperty.Builder<AsyncPipeDataTransferServiceClient>()
-                  
.setMaxClientNumForEachNode(conf.getPipeAsyncConnectorMaxTsFileClientNumber())
+                  
.setMaxClientNumForEachNode(conf.getPipeAsyncSinkMaxTsFileClientNumber())
                   .build()
                   .getConfig());
       ClientManagerMetrics.getInstance()
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
index 9990c6a958b..0e44d0ab1a2 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
@@ -268,12 +268,13 @@ public class CommonConfig {
   private long pipeSourceAssignerDisruptorRingBufferEntrySizeInBytes = 72 * KB;
   private long pipeSourceMatcherCacheSize = 1024;
 
-  private int pipeConnectorHandshakeTimeoutMs = 10 * 1000; // 10 seconds
-  private int pipeConnectorTransferTimeoutMs = 15 * 60 * 1000; // 15 minutes
-  private int pipeConnectorReadFileBufferSize = 5242880; // 5MB
-  private boolean isPipeConnectorReadFileBufferMemoryControlEnabled = false;
-  private long pipeConnectorRetryIntervalMs = 1000L;
-  private boolean pipeConnectorRPCThriftCompressionEnabled = false;
+  private int pipeSinkHandshakeTimeoutMs = 10 * 1000; // 10 seconds
+  private int pipeSinkTransferTimeoutMs = 15 * 60 * 1000; // 15 minutes
+  private int pipeSinkReadFileBufferSize = 5242880; // 5MB
+  private boolean isPipeSinkReadFileBufferMemoryControlEnabled = false;
+  private long pipeSinkRetryIntervalMs = 800L;
+  private boolean pipeSinkRetryLocallyForConnectionError = true;
+  private boolean pipeSinkRPCThriftCompressionEnabled = false;
 
   private int pipeAsyncSinkForcedRetryTsFileEventQueueSize = 5;
   private int pipeAsyncSinkForcedRetryTabletEventQueueSize = 20;
@@ -1052,88 +1053,85 @@ public class CommonConfig {
     logger.info("pipeSourceMatcherCacheSize is set to {}.", 
pipeSourceMatcherCacheSize);
   }
 
-  public int getPipeConnectorHandshakeTimeoutMs() {
-    return pipeConnectorHandshakeTimeoutMs;
+  public int getPipeSinkHandshakeTimeoutMs() {
+    return pipeSinkHandshakeTimeoutMs;
   }
 
-  public void setPipeConnectorHandshakeTimeoutMs(long 
pipeConnectorHandshakeTimeoutMs) {
-    final int fPipeConnectorHandshakeTimeoutMs = 
this.pipeConnectorHandshakeTimeoutMs;
+  public void setPipeSinkHandshakeTimeoutMs(long pipeSinkHandshakeTimeoutMs) {
+    final int fPipeConnectorHandshakeTimeoutMs = 
this.pipeSinkHandshakeTimeoutMs;
     try {
-      this.pipeConnectorHandshakeTimeoutMs = 
Math.toIntExact(pipeConnectorHandshakeTimeoutMs);
+      this.pipeSinkHandshakeTimeoutMs = 
Math.toIntExact(pipeSinkHandshakeTimeoutMs);
     } catch (ArithmeticException e) {
-      this.pipeConnectorHandshakeTimeoutMs = Integer.MAX_VALUE;
+      this.pipeSinkHandshakeTimeoutMs = Integer.MAX_VALUE;
       logger.warn(
           "Given pipe connector handshake timeout is too large, set to {} 
ms.", Integer.MAX_VALUE);
     } finally {
-      if (fPipeConnectorHandshakeTimeoutMs != 
this.pipeConnectorHandshakeTimeoutMs) {
+      if (fPipeConnectorHandshakeTimeoutMs != this.pipeSinkHandshakeTimeoutMs) 
{
         logger.info(
-            "pipeConnectorHandshakeTimeoutMs is set to {}.", 
this.pipeConnectorHandshakeTimeoutMs);
+            "pipeConnectorHandshakeTimeoutMs is set to {}.", 
this.pipeSinkHandshakeTimeoutMs);
       }
     }
   }
 
-  public int getPipeConnectorTransferTimeoutMs() {
-    return pipeConnectorTransferTimeoutMs;
+  public int getPipeSinkTransferTimeoutMs() {
+    return pipeSinkTransferTimeoutMs;
   }
 
-  public void setPipeConnectorTransferTimeoutMs(long 
pipeConnectorTransferTimeoutMs) {
-    final int fPipeConnectorTransferTimeoutMs = 
this.pipeConnectorTransferTimeoutMs;
+  public void setPipeSinkTransferTimeoutMs(long pipeSinkTransferTimeoutMs) {
+    final int fPipeConnectorTransferTimeoutMs = this.pipeSinkTransferTimeoutMs;
     try {
-      this.pipeConnectorTransferTimeoutMs = 
Math.toIntExact(pipeConnectorTransferTimeoutMs);
+      this.pipeSinkTransferTimeoutMs = 
Math.toIntExact(pipeSinkTransferTimeoutMs);
     } catch (ArithmeticException e) {
-      this.pipeConnectorTransferTimeoutMs = Integer.MAX_VALUE;
+      this.pipeSinkTransferTimeoutMs = Integer.MAX_VALUE;
       logger.warn(
           "Given pipe connector transfer timeout is too large, set to {} ms.", 
Integer.MAX_VALUE);
     } finally {
-      if (fPipeConnectorTransferTimeoutMs != 
this.pipeConnectorTransferTimeoutMs) {
-        logger.info("pipeConnectorTransferTimeoutMs is set to {}.", 
pipeConnectorTransferTimeoutMs);
+      if (fPipeConnectorTransferTimeoutMs != this.pipeSinkTransferTimeoutMs) {
+        logger.info("pipeConnectorTransferTimeoutMs is set to {}.", 
pipeSinkTransferTimeoutMs);
       }
     }
   }
 
-  public int getPipeConnectorReadFileBufferSize() {
-    return pipeConnectorReadFileBufferSize;
+  public int getPipeSinkReadFileBufferSize() {
+    return pipeSinkReadFileBufferSize;
   }
 
-  public void setPipeConnectorReadFileBufferSize(int 
pipeConnectorReadFileBufferSize) {
-    if (this.pipeConnectorReadFileBufferSize == 
pipeConnectorReadFileBufferSize) {
+  public void setPipeSinkReadFileBufferSize(int pipeSinkReadFileBufferSize) {
+    if (this.pipeSinkReadFileBufferSize == pipeSinkReadFileBufferSize) {
       return;
     }
-    this.pipeConnectorReadFileBufferSize = pipeConnectorReadFileBufferSize;
-    logger.info("pipeConnectorReadFileBufferSize is set to {}.", 
pipeConnectorReadFileBufferSize);
+    this.pipeSinkReadFileBufferSize = pipeSinkReadFileBufferSize;
+    logger.info("pipeConnectorReadFileBufferSize is set to {}.", 
pipeSinkReadFileBufferSize);
   }
 
-  public boolean isPipeConnectorReadFileBufferMemoryControlEnabled() {
-    return isPipeConnectorReadFileBufferMemoryControlEnabled;
+  public boolean isPipeSinkReadFileBufferMemoryControlEnabled() {
+    return isPipeSinkReadFileBufferMemoryControlEnabled;
   }
 
-  public void setIsPipeConnectorReadFileBufferMemoryControlEnabled(
-      boolean isPipeConnectorReadFileBufferMemoryControlEnabled) {
-    if (this.isPipeConnectorReadFileBufferMemoryControlEnabled
-        == isPipeConnectorReadFileBufferMemoryControlEnabled) {
+  public void setIsPipeSinkReadFileBufferMemoryControlEnabled(
+      boolean isPipeSinkReadFileBufferMemoryControlEnabled) {
+    if (this.isPipeSinkReadFileBufferMemoryControlEnabled
+        == isPipeSinkReadFileBufferMemoryControlEnabled) {
       return;
     }
-    this.isPipeConnectorReadFileBufferMemoryControlEnabled =
-        isPipeConnectorReadFileBufferMemoryControlEnabled;
+    this.isPipeSinkReadFileBufferMemoryControlEnabled =
+        isPipeSinkReadFileBufferMemoryControlEnabled;
     logger.info(
-        "isPipeConnectorReadFileBufferMemoryControlEnabled is set to {}.",
-        isPipeConnectorReadFileBufferMemoryControlEnabled);
+        "isPipeSinkReadFileBufferMemoryControlEnabled is set to {}.",
+        isPipeSinkReadFileBufferMemoryControlEnabled);
   }
 
-  public void setPipeConnectorRPCThriftCompressionEnabled(
-      boolean pipeConnectorRPCThriftCompressionEnabled) {
-    if (this.isPipeConnectorReadFileBufferMemoryControlEnabled
-        == pipeConnectorRPCThriftCompressionEnabled) {
+  public void setPipeSinkRPCThriftCompressionEnabled(boolean 
pipeSinkRPCThriftCompressionEnabled) {
+    if (this.isPipeSinkReadFileBufferMemoryControlEnabled == 
pipeSinkRPCThriftCompressionEnabled) {
       return;
     }
-    this.pipeConnectorRPCThriftCompressionEnabled = 
pipeConnectorRPCThriftCompressionEnabled;
+    this.pipeSinkRPCThriftCompressionEnabled = 
pipeSinkRPCThriftCompressionEnabled;
     logger.info(
-        "pipeConnectorRPCThriftCompressionEnabled is set to {}.",
-        pipeConnectorRPCThriftCompressionEnabled);
+        "pipeSinkRPCThriftCompressionEnabled is set to {}.", 
pipeSinkRPCThriftCompressionEnabled);
   }
 
-  public boolean isPipeConnectorRPCThriftCompressionEnabled() {
-    return pipeConnectorRPCThriftCompressionEnabled;
+  public boolean isPipeSinkRPCThriftCompressionEnabled() {
+    return pipeSinkRPCThriftCompressionEnabled;
   }
 
   public void setPipeAsyncSinkForcedRetryTsFileEventQueueSize(
@@ -1199,11 +1197,11 @@ public class CommonConfig {
         pipeAsyncConnectorMaxRetryExecutionTimeMsPerCall);
   }
 
-  public long getPipeAsyncConnectorMaxRetryExecutionTimeMsPerCall() {
+  public long getPipeAsyncSinkMaxRetryExecutionTimeMsPerCall() {
     return pipeAsyncConnectorMaxRetryExecutionTimeMsPerCall;
   }
 
-  public int getPipeAsyncConnectorSelectorNumber() {
+  public int getPipeAsyncSinkSelectorNumber() {
     return pipeAsyncConnectorSelectorNumber;
   }
 
@@ -1221,7 +1219,7 @@ public class CommonConfig {
     logger.info("pipeAsyncConnectorSelectorNumber is set to {}.", 
pipeAsyncConnectorSelectorNumber);
   }
 
-  public int getPipeAsyncConnectorMaxClientNumber() {
+  public int getPipeAsyncSinkMaxClientNumber() {
     return pipeAsyncConnectorMaxClientNumber;
   }
 
@@ -1240,7 +1238,7 @@ public class CommonConfig {
         "pipeAsyncConnectorMaxClientNumber is set to {}.", 
pipeAsyncConnectorMaxClientNumber);
   }
 
-  public int getPipeAsyncConnectorMaxTsFileClientNumber() {
+  public int getPipeAsyncSinkMaxTsFileClientNumber() {
     return pipeAsyncConnectorMaxTsFileClientNumber;
   }
 
@@ -1357,16 +1355,31 @@ public class CommonConfig {
     logger.info("pipeAutoRestartEnabled is set to {}.", 
pipeAutoRestartEnabled);
   }
 
-  public long getPipeConnectorRetryIntervalMs() {
-    return pipeConnectorRetryIntervalMs;
+  public long getPipeSinkRetryIntervalMs() {
+    return pipeSinkRetryIntervalMs;
   }
 
-  public void setPipeConnectorRetryIntervalMs(long 
pipeConnectorRetryIntervalMs) {
-    if (this.pipeConnectorRetryIntervalMs == pipeConnectorRetryIntervalMs) {
+  public void setPipeSinkRetryIntervalMs(long pipeConnectorRetryIntervalMs) {
+    if (this.pipeSinkRetryIntervalMs == pipeConnectorRetryIntervalMs) {
       return;
     }
-    this.pipeConnectorRetryIntervalMs = pipeConnectorRetryIntervalMs;
-    logger.info("pipeConnectorRetryIntervalMs is set to {}", 
pipeConnectorRetryIntervalMs);
+    this.pipeSinkRetryIntervalMs = pipeConnectorRetryIntervalMs;
+    logger.info("pipeSinkRetryIntervalMs is set to {}", 
pipeConnectorRetryIntervalMs);
+  }
+
+  public boolean isPipeSinkRetryLocallyForConnectionError() {
+    return pipeSinkRetryLocallyForConnectionError;
+  }
+
+  public void setPipeSinkRetryLocallyForConnectionError(
+      boolean pipeSinkRetryLocallyForConnectionError) {
+    if (this.pipeSinkRetryLocallyForConnectionError == 
pipeSinkRetryLocallyForConnectionError) {
+      return;
+    }
+    this.pipeSinkRetryLocallyForConnectionError = 
pipeSinkRetryLocallyForConnectionError;
+    logger.info(
+        "pipeSinkRetryLocallyForConnectionError is set to {}",
+        pipeSinkRetryLocallyForConnectionError);
   }
 
   public int 
getPipeSubtaskExecutorBasicCheckPointIntervalByConsumedEventCount() {
@@ -2101,7 +2114,7 @@ public class CommonConfig {
         "rateLimiterHotReloadCheckIntervalMs is set to {}", 
rateLimiterHotReloadCheckIntervalMs);
   }
 
-  public int getPipeConnectorRequestSliceThresholdBytes() {
+  public int getPipeSinkRequestSliceThresholdBytes() {
     return pipeConnectorRequestSliceThresholdBytes;
   }
 
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeAbstractSinkSubtask.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeAbstractSinkSubtask.java
index 5f7156f6324..ccf16bfa753 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeAbstractSinkSubtask.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeAbstractSinkSubtask.java
@@ -134,6 +134,12 @@ public abstract class PipeAbstractSinkSubtask extends 
PipeReportableSubtask {
           // return if the pipe task should be stopped
           return;
         }
+        if 
(PipeConfig.getInstance().isPipeSinkRetryLocallyForConnectionError()) {
+          super.onFailure(
+              new PipeRuntimeSinkNonReportTimeConfigurableException(
+                  throwable.getMessage(), Long.MAX_VALUE));
+          return;
+        }
       }
 
       // Handle exceptions if any available clients exist
@@ -144,9 +150,10 @@ public abstract class PipeAbstractSinkSubtask extends 
PipeReportableSubtask {
         super.onFailure(throwable);
       } else {
         // Print stack trace for better debugging
-        LOGGER.warn(
-            "A non PipeRuntimeConnectorCriticalException occurred, will throw 
a PipeRuntimeConnectorCriticalException.",
-            throwable);
+        PipeLogger.log(
+            LOGGER::warn,
+            throwable,
+            "A non PipeRuntimeSinkCriticalException occurred, will throw a 
PipeRuntimeSinkCriticalException.");
         super.onFailure(new 
PipeRuntimeSinkCriticalException(throwable.getMessage()));
       }
     }
@@ -179,8 +186,7 @@ public abstract class PipeAbstractSinkSubtask extends 
PipeReportableSubtask {
             MAX_RETRY_TIMES,
             e);
         try {
-          sleepIfNoHighPriorityTask(
-              retry * 
PipeConfig.getInstance().getPipeConnectorRetryIntervalMs());
+          sleepIfNoHighPriorityTask(retry * 
PipeConfig.getInstance().getPipeSinkRetryIntervalMs());
         } catch (final InterruptedException interruptedException) {
           LOGGER.info(
               "Interrupted while sleeping, will retry to handshake with the 
target system.",
@@ -192,7 +198,9 @@ public abstract class PipeAbstractSinkSubtask extends 
PipeReportableSubtask {
 
     // Stop current pipe task directly if failed to reconnect to
     // the target system after MAX_RETRY_TIMES times
-    if (retry == MAX_RETRY_TIMES && lastEvent instanceof EnrichedEvent) {
+    if (retry == MAX_RETRY_TIMES
+        && lastEvent instanceof EnrichedEvent
+        && 
!PipeConfig.getInstance().isPipeSinkRetryLocallyForConnectionError()) {
       report(
           (EnrichedEvent) lastEvent,
           new PipeRuntimeSinkCriticalException(
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeReportableSubtask.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeReportableSubtask.java
index f290f8c4965..3c1a63064af 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeReportableSubtask.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeReportableSubtask.java
@@ -22,8 +22,10 @@ package org.apache.iotdb.commons.pipe.agent.task.subtask;
 import 
org.apache.iotdb.commons.exception.pipe.PipeConsensusRetryWithIncreasingIntervalException;
 import org.apache.iotdb.commons.exception.pipe.PipeRuntimeCriticalException;
 import org.apache.iotdb.commons.exception.pipe.PipeRuntimeException;
+import 
org.apache.iotdb.commons.exception.pipe.PipeRuntimeSinkNonReportTimeConfigurableException;
 import 
org.apache.iotdb.commons.exception.pipe.PipeRuntimeSinkRetryTimesConfigurableException;
 import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
+import org.apache.iotdb.commons.pipe.resource.log.PipeLogger;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -49,10 +51,11 @@ public abstract class PipeReportableSubtask extends 
PipeSubtask {
       return;
     }
 
-    if (lastEvent instanceof EnrichedEvent) {
-      onEnrichedEventFailure(throwable);
+    if (lastEvent instanceof EnrichedEvent
+        && !(throwable instanceof 
PipeRuntimeSinkNonReportTimeConfigurableException)) {
+      onReportEventFailure(throwable);
     } else {
-      onNonEnrichedEventFailure(throwable);
+      onNonReportEventFailure(throwable);
     }
 
     // Although the pipe task will be stopped, we still don't release the last 
event here
@@ -75,7 +78,7 @@ public abstract class PipeReportableSubtask extends 
PipeSubtask {
     return sleepInterval;
   }
 
-  private void onEnrichedEventFailure(final Throwable throwable) {
+  private void onReportEventFailure(final Throwable throwable) {
     final int maxRetryTimes =
         throwable instanceof PipeRuntimeSinkRetryTimesConfigurableException
             ? ((PipeRuntimeSinkRetryTimesConfigurableException) 
throwable).getRetryTimes()
@@ -94,15 +97,16 @@ public abstract class PipeReportableSubtask extends 
PipeSubtask {
 
     retryCount.incrementAndGet();
     if (retryCount.get() <= maxRetryTimes) {
-      LOGGER.warn(
+      PipeLogger.log(
+          LOGGER::warn,
+          throwable,
           "Retry executing subtask {} (creation time: {}, simple class: {}), 
retry count [{}/{}], last exception: {}",
           taskID,
           creationTime,
           this.getClass().getSimpleName(),
           retryCount.get(),
           maxRetryTimes,
-          throwable.getMessage(),
-          throwable);
+          throwable.getMessage());
       try {
         sleepIfNoHighPriorityTask(getSleepIntervalBasedOnThrowable(throwable));
       } catch (final InterruptedException e) {
@@ -149,7 +153,7 @@ public abstract class PipeReportableSubtask extends 
PipeSubtask {
 
   protected abstract void report(final EnrichedEvent event, final 
PipeRuntimeException exception);
 
-  private void onNonEnrichedEventFailure(final Throwable throwable) {
+  private void onNonReportEventFailure(final Throwable throwable) {
     if (retryCount.get() == 0) {
       LOGGER.warn(
           "Failed to execute subtask {} (creation time: {}, simple class: {}), 
"
@@ -162,7 +166,8 @@ public abstract class PipeReportableSubtask extends 
PipeSubtask {
     }
 
     retryCount.incrementAndGet();
-    LOGGER.warn(
+    PipeLogger.log(
+        LOGGER::warn,
         "Retry executing subtask {} (creation time: {}, simple class: {}), 
retry count {}, last exception: {}",
         taskID,
         creationTime,
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
index 15ae89ee620..73cd8d8a4bf 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
@@ -103,7 +103,7 @@ public class PipeConfig {
     return COMMON_CONFIG.getPipeMinimumReceiverMemory();
   }
 
-  /////////////////////////////// Subtask Connector 
///////////////////////////////
+  /////////////////////////////// Subtask Sink ///////////////////////////////
 
   public int getPipeRealTimeQueuePollTsFileThreshold() {
     return COMMON_CONFIG.getPipeRealTimeQueuePollTsFileThreshold();
@@ -173,30 +173,34 @@ public class PipeConfig {
     return COMMON_CONFIG.getPipeSourceMatcherCacheSize();
   }
 
-  /////////////////////////////// Connector ///////////////////////////////
+  /////////////////////////////// Sink ///////////////////////////////
 
-  public int getPipeConnectorHandshakeTimeoutMs() {
-    return COMMON_CONFIG.getPipeConnectorHandshakeTimeoutMs();
+  public int getPipeSinkHandshakeTimeoutMs() {
+    return COMMON_CONFIG.getPipeSinkHandshakeTimeoutMs();
   }
 
-  public int getPipeConnectorTransferTimeoutMs() {
-    return COMMON_CONFIG.getPipeConnectorTransferTimeoutMs();
+  public int getPipeSinkTransferTimeoutMs() {
+    return COMMON_CONFIG.getPipeSinkTransferTimeoutMs();
   }
 
-  public int getPipeConnectorReadFileBufferSize() {
-    return COMMON_CONFIG.getPipeConnectorReadFileBufferSize();
+  public int getPipeSinkReadFileBufferSize() {
+    return COMMON_CONFIG.getPipeSinkReadFileBufferSize();
   }
 
-  public boolean isPipeConnectorReadFileBufferMemoryControlEnabled() {
-    return COMMON_CONFIG.isPipeConnectorReadFileBufferMemoryControlEnabled();
+  public boolean isPipeSinkReadFileBufferMemoryControlEnabled() {
+    return COMMON_CONFIG.isPipeSinkReadFileBufferMemoryControlEnabled();
   }
 
-  public long getPipeConnectorRetryIntervalMs() {
-    return COMMON_CONFIG.getPipeConnectorRetryIntervalMs();
+  public long getPipeSinkRetryIntervalMs() {
+    return COMMON_CONFIG.getPipeSinkRetryIntervalMs();
   }
 
-  public boolean isPipeConnectorRPCThriftCompressionEnabled() {
-    return COMMON_CONFIG.isPipeConnectorRPCThriftCompressionEnabled();
+  public boolean isPipeSinkRetryLocallyForConnectionError() {
+    return COMMON_CONFIG.isPipeSinkRetryLocallyForConnectionError();
+  }
+
+  public boolean isPipeSinkRPCThriftCompressionEnabled() {
+    return COMMON_CONFIG.isPipeSinkRPCThriftCompressionEnabled();
   }
 
   public int getPipeAsyncSinkForcedRetryTsFileEventQueueSize() {
@@ -211,27 +215,27 @@ public class PipeConfig {
     return COMMON_CONFIG.getPipeAsyncSinkForcedRetryTotalEventQueueSize();
   }
 
-  public long getPipeAsyncConnectorMaxRetryExecutionTimeMsPerCall() {
-    return COMMON_CONFIG.getPipeAsyncConnectorMaxRetryExecutionTimeMsPerCall();
+  public long getPipeAsyncSinkMaxRetryExecutionTimeMsPerCall() {
+    return COMMON_CONFIG.getPipeAsyncSinkMaxRetryExecutionTimeMsPerCall();
   }
 
-  public int getPipeAsyncConnectorSelectorNumber() {
-    return COMMON_CONFIG.getPipeAsyncConnectorSelectorNumber();
+  public int getPipeAsyncSinkSelectorNumber() {
+    return COMMON_CONFIG.getPipeAsyncSinkSelectorNumber();
   }
 
-  public int getPipeAsyncConnectorMaxClientNumber() {
-    return COMMON_CONFIG.getPipeAsyncConnectorMaxClientNumber();
+  public int getPipeAsyncSinkMaxClientNumber() {
+    return COMMON_CONFIG.getPipeAsyncSinkMaxClientNumber();
   }
 
-  public int getPipeAsyncConnectorMaxTsFileClientNumber() {
-    return COMMON_CONFIG.getPipeAsyncConnectorMaxTsFileClientNumber();
+  public int getPipeAsyncSinkMaxTsFileClientNumber() {
+    return COMMON_CONFIG.getPipeAsyncSinkMaxTsFileClientNumber();
   }
 
   public double getPipeSendTsFileRateLimitBytesPerSecond() {
     return COMMON_CONFIG.getPipeSendTsFileRateLimitBytesPerSecond();
   }
 
-  public double getPipeAllConnectorsRateLimitBytesPerSecond() {
+  public double getPipeAllSinksRateLimitBytesPerSecond() {
     return COMMON_CONFIG.getPipeAllSinksRateLimitBytesPerSecond();
   }
 
@@ -239,8 +243,8 @@ public class PipeConfig {
     return COMMON_CONFIG.getRateLimiterHotReloadCheckIntervalMs();
   }
 
-  public int getPipeConnectorRequestSliceThresholdBytes() {
-    return COMMON_CONFIG.getPipeConnectorRequestSliceThresholdBytes();
+  public int getPipeSinkRequestSliceThresholdBytes() {
+    return COMMON_CONFIG.getPipeSinkRequestSliceThresholdBytes();
   }
 
   public long getPipeMaxReaderChunkSize() {
@@ -517,16 +521,14 @@ public class PipeConfig {
         getPipeSourceAssignerDisruptorRingBufferEntrySizeInBytes());
     LOGGER.info("PipeSourceMatcherCacheSize: {}", 
getPipeSourceMatcherCacheSize());
 
-    LOGGER.info("PipeConnectorHandshakeTimeoutMs: {}", 
getPipeConnectorHandshakeTimeoutMs());
-    LOGGER.info("PipeConnectorTransferTimeoutMs: {}", 
getPipeConnectorTransferTimeoutMs());
-    LOGGER.info("PipeConnectorReadFileBufferSize: {}", 
getPipeConnectorReadFileBufferSize());
-    LOGGER.info(
-        "PipeConnectorReadFileBufferMemoryControlEnabled: {}",
-        isPipeConnectorReadFileBufferMemoryControlEnabled());
-    LOGGER.info("PipeConnectorRetryIntervalMs: {}", 
getPipeConnectorRetryIntervalMs());
+    LOGGER.info("PipeSinkHandshakeTimeoutMs: {}", 
getPipeSinkHandshakeTimeoutMs());
+    LOGGER.info("PipeSinkTransferTimeoutMs: {}", 
getPipeSinkTransferTimeoutMs());
+    LOGGER.info("PipeSinkReadFileBufferSize: {}", 
getPipeSinkReadFileBufferSize());
     LOGGER.info(
-        "PipeConnectorRPCThriftCompressionEnabled: {}",
-        isPipeConnectorRPCThriftCompressionEnabled());
+        "PipeSinkReadFileBufferMemoryControlEnabled: {}",
+        isPipeSinkReadFileBufferMemoryControlEnabled());
+    LOGGER.info("PipeSinkRetryIntervalMs: {}", getPipeSinkRetryIntervalMs());
+    LOGGER.info("PipeSinkRPCThriftCompressionEnabled: {}", 
isPipeSinkRPCThriftCompressionEnabled());
     LOGGER.info(
         "PipeLeaderCacheMemoryUsagePercentage: {}", 
getPipeLeaderCacheMemoryUsagePercentage());
     LOGGER.info("PipeMaxAlignedSeriesChunkSizeInOneBatch: {}", 
getPipeMaxReaderChunkSize());
@@ -564,34 +566,29 @@ public class PipeConfig {
         getPipeThresholdAllocationStrategyFixedMemoryHighUsageThreshold());
 
     LOGGER.info(
-        "PipeAsyncConnectorForcedRetryTsFileEventQueueSizeThreshold: {}",
+        "PipeAsyncSinkForcedRetryTsFileEventQueueSizeThreshold: {}",
         getPipeAsyncSinkForcedRetryTsFileEventQueueSize());
     LOGGER.info(
-        "PipeAsyncConnectorForcedRetryTabletEventQueueSizeThreshold: {}",
+        "PipeAsyncSinkForcedRetryTabletEventQueueSizeThreshold: {}",
         getPipeAsyncSinkForcedRetryTabletEventQueueSize());
     LOGGER.info(
-        "PipeAsyncConnectorForcedRetryTotalEventQueueSizeThreshold: {}",
+        "PipeAsyncSinkForcedRetryTotalEventQueueSizeThreshold: {}",
         getPipeAsyncSinkForcedRetryTotalEventQueueSize());
     LOGGER.info(
-        "PipeAsyncConnectorMaxRetryExecutionTimeMsPerCall: {}",
-        getPipeAsyncConnectorMaxRetryExecutionTimeMsPerCall());
-    LOGGER.info("PipeAsyncConnectorSelectorNumber: {}", 
getPipeAsyncConnectorSelectorNumber());
-    LOGGER.info("PipeAsyncConnectorMaxClientNumber: {}", 
getPipeAsyncConnectorMaxClientNumber());
-    LOGGER.info(
-        "PipeAsyncConnectorMaxTsFileClientNumber: {}",
-        getPipeAsyncConnectorMaxTsFileClientNumber());
+        "PipeAsyncSinkMaxRetryExecutionTimeMsPerCall: {}",
+        getPipeAsyncSinkMaxRetryExecutionTimeMsPerCall());
+    LOGGER.info("PipeAsyncSinkSelectorNumber: {}", 
getPipeAsyncSinkSelectorNumber());
+    LOGGER.info("PipeAsyncSinkMaxClientNumber: {}", 
getPipeAsyncSinkMaxClientNumber());
+    LOGGER.info("PipeAsyncSinkMaxTsFileClientNumber: {}", 
getPipeAsyncSinkMaxTsFileClientNumber());
 
     LOGGER.info(
         "PipeSendTsFileRateLimitBytesPerSecond: {}", 
getPipeSendTsFileRateLimitBytesPerSecond());
     LOGGER.info(
-        "PipeAllConnectorsRateLimitBytesPerSecond: {}",
-        getPipeAllConnectorsRateLimitBytesPerSecond());
+        "PipeAllSinksRateLimitBytesPerSecond: {}", 
getPipeAllSinksRateLimitBytesPerSecond());
     LOGGER.info(
         "RateLimiterHotReloadCheckIntervalMs: {}", 
getRateLimiterHotReloadCheckIntervalMs());
 
-    LOGGER.info(
-        "PipeConnectorRequestSliceThresholdBytes: {}",
-        getPipeConnectorRequestSliceThresholdBytes());
+    LOGGER.info("PipeSinkRequestSliceThresholdBytes: {}", 
getPipeSinkRequestSliceThresholdBytes());
 
     LOGGER.info("SeperatedPipeHeartbeatEnabled: {}", 
isSeperatedPipeHeartbeatEnabled());
     LOGGER.info(
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
index 05592809bbb..28c2465bb0f 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
@@ -344,42 +344,49 @@ public class PipeDescriptor {
                         "pipe_extractor_matcher_cache_size",
                         
String.valueOf(config.getPipeSourceMatcherCacheSize())))));
 
-    config.setPipeConnectorHandshakeTimeoutMs(
+    config.setPipeSinkHandshakeTimeoutMs(
         Long.parseLong(
             
Optional.ofNullable(properties.getProperty("pipe_sink_handshake_timeout_ms"))
                 .orElse(
                     properties.getProperty(
                         "pipe_connector_handshake_timeout_ms",
-                        
String.valueOf(config.getPipeConnectorHandshakeTimeoutMs())))));
-    config.setPipeConnectorReadFileBufferSize(
+                        
String.valueOf(config.getPipeSinkHandshakeTimeoutMs())))));
+    config.setPipeSinkReadFileBufferSize(
         Integer.parseInt(
             
Optional.ofNullable(properties.getProperty("pipe_sink_read_file_buffer_size"))
                 .orElse(
                     properties.getProperty(
                         "pipe_connector_read_file_buffer_size",
-                        
String.valueOf(config.getPipeConnectorReadFileBufferSize())))));
-    config.setIsPipeConnectorReadFileBufferMemoryControlEnabled(
+                        
String.valueOf(config.getPipeSinkReadFileBufferSize())))));
+    config.setIsPipeSinkReadFileBufferMemoryControlEnabled(
         Boolean.parseBoolean(
             
Optional.ofNullable(properties.getProperty("pipe_sink_read_file_buffer_memory_control"))
                 .orElse(
                     properties.getProperty(
                         "pipe_connector_read_file_buffer_memory_control",
-                        String.valueOf(
-                            
config.isPipeConnectorReadFileBufferMemoryControlEnabled())))));
-    config.setPipeConnectorRetryIntervalMs(
+                        
String.valueOf(config.isPipeSinkReadFileBufferMemoryControlEnabled())))));
+    config.setPipeSinkRetryIntervalMs(
         Long.parseLong(
             
Optional.ofNullable(properties.getProperty("pipe_sink_retry_interval_ms"))
                 .orElse(
                     properties.getProperty(
                         "pipe_connector_retry_interval_ms",
-                        
String.valueOf(config.getPipeConnectorRetryIntervalMs())))));
-    config.setPipeConnectorRPCThriftCompressionEnabled(
+                        
String.valueOf(config.getPipeSinkRetryIntervalMs())))));
+    config.setPipeSinkRetryLocallyForConnectionError(
+        Boolean.parseBoolean(
+            Optional.ofNullable(
+                    
properties.getProperty("pipe_sink_retry_locally_for_connection_error"))
+                .orElse(
+                    properties.getProperty(
+                        "pipe_connector_retry_locally_for_connection_error",
+                        
String.valueOf(config.isPipeSinkRetryLocallyForConnectionError())))));
+    config.setPipeSinkRPCThriftCompressionEnabled(
         Boolean.parseBoolean(
             
Optional.ofNullable(properties.getProperty("pipe_sink_rpc_thrift_compression_enabled"))
                 .orElse(
                     properties.getProperty(
                         "pipe_connector_rpc_thrift_compression_enabled",
-                        
String.valueOf(config.isPipeConnectorRPCThriftCompressionEnabled())))));
+                        
String.valueOf(config.isPipeSinkRPCThriftCompressionEnabled())))));
     config.setPipeAsyncConnectorMaxRetryExecutionTimeMsPerCall(
         Long.parseLong(
             Optional.ofNullable(
@@ -387,8 +394,7 @@ public class PipeDescriptor {
                 .orElse(
                     properties.getProperty(
                         
"pipe_async_connector_max_retry_execution_time_ms_per_call",
-                        String.valueOf(
-                            
config.getPipeAsyncConnectorMaxRetryExecutionTimeMsPerCall())))));
+                        
String.valueOf(config.getPipeAsyncSinkMaxRetryExecutionTimeMsPerCall())))));
     config.setPipeAsyncSinkForcedRetryTsFileEventQueueSize(
         Integer.parseInt(
             Optional.ofNullable(
@@ -425,7 +431,7 @@ public class PipeDescriptor {
         Integer.parseInt(
             properties.getProperty(
                 "pipe_connector_request_slice_threshold_bytes",
-                
String.valueOf(config.getPipeConnectorRequestSliceThresholdBytes()))));
+                
String.valueOf(config.getPipeSinkRequestSliceThresholdBytes()))));
 
     config.setPipeReceiverLoginPeriodicVerificationIntervalMs(
         Long.parseLong(
@@ -575,7 +581,7 @@ public class PipeDescriptor {
         parserPipeConfig(
             properties, "pipe_sink_timeout_ms", "pipe_connector_timeout_ms", 
isHotModify);
     if (value != null) {
-      config.setPipeConnectorTransferTimeoutMs(Long.parseLong(value));
+      config.setPipeSinkTransferTimeoutMs(Long.parseLong(value));
     }
 
     value =
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/client/IoTDBClientManager.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/client/IoTDBClientManager.java
index 3e009b279d7..1f76f5d2453 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/client/IoTDBClientManager.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/client/IoTDBClientManager.java
@@ -58,7 +58,7 @@ public abstract class IoTDBClientManager {
   private static final int MAX_CONNECTION_TIMEOUT_MS = 24 * 60 * 60 * 1000; // 
1 day
   private static final int FIRST_ADJUSTMENT_TIMEOUT_MS = 6 * 60 * 60 * 1000; 
// 6 hours
   protected static final AtomicInteger CONNECTION_TIMEOUT_MS =
-      new 
AtomicInteger(PipeConfig.getInstance().getPipeConnectorTransferTimeoutMs());
+      new 
AtomicInteger(PipeConfig.getInstance().getPipeSinkTransferTimeoutMs());
 
   protected IoTDBClientManager(
       final List<TEndPoint> endPointList,
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/client/IoTDBSyncClient.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/client/IoTDBSyncClient.java
index a0bb65b5a7e..b7f42295e6c 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/client/IoTDBSyncClient.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/client/IoTDBSyncClient.java
@@ -100,7 +100,7 @@ public class IoTDBSyncClient extends 
IClientRPCService.Client
 
   @Override
   public TPipeTransferResp pipeTransfer(final TPipeTransferReq req) throws 
TException {
-    final int bodySizeLimit = 
PipeConfig.getInstance().getPipeConnectorRequestSliceThresholdBytes();
+    final int bodySizeLimit = 
PipeConfig.getInstance().getPipeSinkRequestSliceThresholdBytes();
     if (req.getVersion() != IoTDBSinkRequestVersion.VERSION_1.getVersion()
         || req.body.limit() < bodySizeLimit) {
       return super.pipeTransfer(req);
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/client/IoTDBSyncClientManager.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/client/IoTDBSyncClientManager.java
index 4c5d51f83c5..76c145d0dab 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/client/IoTDBSyncClientManager.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/client/IoTDBSyncClientManager.java
@@ -198,9 +198,9 @@ public abstract class IoTDBSyncClientManager extends 
IoTDBClientManager implemen
       clientAndStatus.setLeft(
           new IoTDBSyncClient(
               new ThriftClientProperty.Builder()
-                  
.setConnectionTimeoutMs(PIPE_CONFIG.getPipeConnectorHandshakeTimeoutMs())
+                  
.setConnectionTimeoutMs(PIPE_CONFIG.getPipeSinkHandshakeTimeoutMs())
                   .setRpcThriftCompressionEnabled(
-                      PIPE_CONFIG.isPipeConnectorRPCThriftCompressionEnabled())
+                      PIPE_CONFIG.isPipeSinkRPCThriftCompressionEnabled())
                   .build(),
               endPoint.getIp(),
               endPoint.getPort(),
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/limiter/GlobalRPCRateLimiter.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/limiter/GlobalRPCRateLimiter.java
index 9a6aba5b90b..e497656eab2 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/limiter/GlobalRPCRateLimiter.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/limiter/GlobalRPCRateLimiter.java
@@ -28,6 +28,6 @@ public class GlobalRPCRateLimiter extends GlobalRateLimiter {
 
   @Override
   protected double getThroughputBytesPerSecond() {
-    return CONFIG.getPipeAllConnectorsRateLimitBytesPerSecond();
+    return CONFIG.getPipeAllSinksRateLimitBytesPerSecond();
   }
 }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/IoTDBAirGapSink.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/IoTDBAirGapSink.java
index aba6c7e1d82..7d84e3bb98f 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/IoTDBAirGapSink.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/IoTDBAirGapSink.java
@@ -242,7 +242,7 @@ public abstract class IoTDBAirGapSink extends IoTDBSink {
     } else {
       supportModsIfIsDataNodeReceiver = true;
     }
-    socket.setSoTimeout(PIPE_CONFIG.getPipeConnectorTransferTimeoutMs());
+    socket.setSoTimeout(PIPE_CONFIG.getPipeSinkTransferTimeoutMs());
     LOGGER.info("Handshake success. Socket: {}", socket);
   }
 
@@ -269,7 +269,7 @@ public abstract class IoTDBAirGapSink extends IoTDBSink {
       final AirGapSocket socket,
       final boolean isMultiFile)
       throws PipeException, IOException {
-    final int readFileBufferSize = 
PipeConfig.getInstance().getPipeConnectorReadFileBufferSize();
+    final int readFileBufferSize = 
PipeConfig.getInstance().getPipeSinkReadFileBufferSize();
     final byte[] readBuffer = new byte[readFileBufferSize];
     long position = 0;
     try (final RandomAccessFile reader = new RandomAccessFile(file, "r")) {
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/IoTDBSink.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/IoTDBSink.java
index 9dda99c22d3..88a8b71775f 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/IoTDBSink.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/IoTDBSink.java
@@ -398,7 +398,7 @@ public abstract class IoTDBSink implements PipeConnector {
 
     nodeUrls.clear();
     nodeUrls.addAll(parseNodeUrls(parameters));
-    LOGGER.info("IoTDBConnector nodeUrls: {}", nodeUrls);
+    LOGGER.info("IoTDBSink nodeUrls: {}", nodeUrls);
 
     isTabletBatchModeEnabled =
         parameters.getBooleanOrDefault(
@@ -410,7 +410,7 @@ public abstract class IoTDBSink implements PipeConnector {
                     Arrays.asList(CONNECTOR_FORMAT_KEY, SINK_FORMAT_KEY),
                     CONNECTOR_FORMAT_HYBRID_VALUE)
                 .equals(CONNECTOR_FORMAT_TS_FILE_VALUE);
-    LOGGER.info("IoTDBConnector isTabletBatchModeEnabled: {}", 
isTabletBatchModeEnabled);
+    LOGGER.info("IoTDBSink isTabletBatchModeEnabled: {}", 
isTabletBatchModeEnabled);
 
     final boolean shouldMarkAsGeneralWriteRequest =
         parameters.getBooleanOrDefault(
@@ -426,7 +426,7 @@ public abstract class IoTDBSink implements PipeConnector {
               Arrays.asList(CONNECTOR_MARK_AS_PIPE_REQUEST_KEY, 
SINK_MARK_AS_PIPE_REQUEST_KEY),
               CONNECTOR_MARK_AS_PIPE_REQUEST_DEFAULT_VALUE);
     }
-    LOGGER.info("IoTDBConnector shouldMarkAsPipeRequest: {}", 
shouldMarkAsPipeRequest);
+    LOGGER.info("IoTDBSink shouldMarkAsPipeRequest: {}", 
shouldMarkAsPipeRequest);
 
     final String connectorSkipIfValue =
         parameters
@@ -445,7 +445,7 @@ public abstract class IoTDBSink implements PipeConnector {
       throw new PipeParameterNotValidException(
           String.format("Parameters in set %s are not allowed in 'skipif'", 
skipIfOptionSet));
     }
-    LOGGER.info("IoTDBConnector skipIfNoPrivileges: {}", skipIfNoPrivileges);
+    LOGGER.info("IoTDBSink skipIfNoPrivileges: {}", skipIfNoPrivileges);
 
     receiverStatusHandler =
         new PipeReceiverStatusHandler(
@@ -485,7 +485,7 @@ public abstract class IoTDBSink implements PipeConnector {
                 SINK_EXCEPTION_DATA_CONVERT_ON_TYPE_MISMATCH_KEY),
             CONNECTOR_EXCEPTION_DATA_CONVERT_ON_TYPE_MISMATCH_DEFAULT_VALUE);
     LOGGER.info(
-        "IoTDBConnector {} = {}",
+        "IoTDBSink {} = {}",
         CONNECTOR_EXCEPTION_DATA_CONVERT_ON_TYPE_MISMATCH_KEY,
         shouldReceiverConvertOnTypeMismatch);
     isRealtimeFirst =
@@ -495,7 +495,7 @@ public abstract class IoTDBSink implements PipeConnector {
                 PipeSinkConstant.SINK_REALTIME_FIRST_KEY),
             PipeSinkConstant.CONNECTOR_REALTIME_FIRST_DEFAULT_VALUE);
     LOGGER.info(
-        "IoTDBConnector {} = {}", 
PipeSinkConstant.CONNECTOR_REALTIME_FIRST_KEY, isRealtimeFirst);
+        "IoTDBSink {} = {}", PipeSinkConstant.CONNECTOR_REALTIME_FIRST_KEY, 
isRealtimeFirst);
   }
 
   protected LinkedHashSet<TEndPoint> parseNodeUrls(final PipeParameters 
parameters)
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/IoTDBSslSyncSink.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/IoTDBSslSyncSink.java
index b6b8e52f1fa..75a4607a23b 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/IoTDBSslSyncSink.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/IoTDBSslSyncSink.java
@@ -184,7 +184,7 @@ public abstract class IoTDBSslSyncSink extends IoTDBSink {
       final Pair<IoTDBSyncClient, Boolean> clientAndStatus,
       final boolean isMultiFile)
       throws PipeException, IOException {
-    final int readFileBufferSize = 
PipeConfig.getInstance().getPipeConnectorReadFileBufferSize();
+    final int readFileBufferSize = 
PipeConfig.getInstance().getPipeSinkReadFileBufferSize();
     final byte[] readBuffer = new byte[readFileBufferSize];
     long position = 0;
     try (final RandomAccessFile reader = new RandomAccessFile(file, "r")) {

Reply via email to