This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new f280381be91 [IOTDB-6067] Pipe: Improve the stability of 
iotdb-thrift-connector-v2 during fault tolerance (avoid OOM) (#10550)
f280381be91 is described below

commit f280381be91083bc96ed054a61dd866dba9aaccd
Author: 马子坤 <[email protected]>
AuthorDate: Fri Jul 28 12:30:51 2023 +0800

    [IOTDB-6067] Pipe: Improve the stability of iotdb-thrift-connector-v2 
during fault tolerance (avoid OOM) (#10550)
    
    Co-authored-by: Steve Yurong Su <[email protected]>
---
 .../db/pipe/agent/runtime/PipeRuntimeAgent.java    |   8 +
 .../iotdb/db/pipe/agent/task/PipeTaskAgent.java    |  52 ++-
 .../pipe/connector/v1/IoTDBThriftConnectorV1.java  |  22 +-
 .../pipe/connector/v2/IoTDBThriftConnectorV2.java  | 404 ++++++++++++++++-----
 ...nsferInsertNodeTabletInsertionEventHandler.java |  12 +-
 ...PipeTransferRawTabletInsertionEventHandler.java |  13 +-
 .../PipeTransferTabletInsertionEventHandler.java   |  69 +---
 .../PipeTransferTsFileInsertionEventHandler.java   |  64 +---
 .../execution/load/LoadTsFileManager.java          |   7 +
 .../resources/conf/iotdb-common.properties         |  12 +
 .../iotdb/commons/client/ClientPoolFactory.java    |  16 +-
 .../async/AsyncPipeDataTransferServiceClient.java  |  19 +-
 .../iotdb/commons/concurrent/ThreadName.java       |   5 +
 .../apache/iotdb/commons/conf/CommonConfig.java    |  39 ++
 .../iotdb/commons/conf/CommonDescriptor.java       |  21 ++
 .../iotdb/commons/pipe/config/PipeConfig.java      |  16 +
 16 files changed, 557 insertions(+), 222 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeRuntimeAgent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeRuntimeAgent.java
index 391bdc97ba2..5112d1e730d 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeRuntimeAgent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeRuntimeAgent.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.pipe.agent.runtime;
 
 import org.apache.iotdb.commons.consensus.index.impl.RecoverProgressIndex;
 import org.apache.iotdb.commons.exception.StartupException;
+import org.apache.iotdb.commons.exception.pipe.PipeRuntimeCriticalException;
 import org.apache.iotdb.commons.exception.pipe.PipeRuntimeException;
 import org.apache.iotdb.commons.pipe.config.PipeConfig;
 import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
@@ -107,6 +108,13 @@ public class PipeRuntimeAgent implements IService {
         pipeTaskMeta,
         pipeRuntimeException.getMessage(),
         pipeRuntimeException);
+
     pipeTaskMeta.trackExceptionMessage(pipeRuntimeException);
+
+    // Quick stop all pipes locally if critical exception occurs,
+    // no need to wait for the next heartbeat cycle.
+    if (pipeRuntimeException instanceof PipeRuntimeCriticalException) {
+      PipeAgent.task().stopAllPipesWithCriticalException();
+    }
   }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskAgent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskAgent.java
index 05ccda0ed1b..2ebe4156b34 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskAgent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskAgent.java
@@ -20,6 +20,8 @@
 package org.apache.iotdb.db.pipe.agent.task;
 
 import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
+import org.apache.iotdb.commons.exception.pipe.PipeRuntimeCriticalException;
+import org.apache.iotdb.commons.exception.pipe.PipeRuntimeException;
 import org.apache.iotdb.commons.pipe.task.meta.PipeMeta;
 import org.apache.iotdb.commons.pipe.task.meta.PipeMetaKeeper;
 import org.apache.iotdb.commons.pipe.task.meta.PipeRuntimeMeta;
@@ -109,13 +111,13 @@ public class PipeTaskAgent {
       List<PipeMeta> pipeMetaListFromConfigNode) {
     acquireWriteLock();
     try {
-      return handlePipeMetaChangesWithoutLock(pipeMetaListFromConfigNode);
+      return handlePipeMetaChangesInternal(pipeMetaListFromConfigNode);
     } finally {
       releaseWriteLock();
     }
   }
 
-  private List<TPushPipeMetaRespExceptionMessage> 
handlePipeMetaChangesWithoutLock(
+  private List<TPushPipeMetaRespExceptionMessage> 
handlePipeMetaChangesInternal(
       List<PipeMeta> pipeMetaListFromConfigNode) {
     // Do nothing if data node is removing or removed
     if (PipeAgent.runtime().isShutdown()) {
@@ -294,13 +296,13 @@ public class PipeTaskAgent {
   public synchronized void dropAllPipeTasks() {
     acquireWriteLock();
     try {
-      dropAllPipeTasksWithoutLock();
+      dropAllPipeTasksInternal();
     } finally {
       releaseWriteLock();
     }
   }
 
-  private void dropAllPipeTasksWithoutLock() {
+  private void dropAllPipeTasksInternal() {
     for (final PipeMeta pipeMeta : pipeMetaKeeper.getPipeMetaList()) {
       try {
         dropPipe(
@@ -315,6 +317,40 @@ public class PipeTaskAgent {
     }
   }
 
+  public synchronized void stopAllPipesWithCriticalException() {
+    acquireWriteLock();
+    try {
+      stopAllPipesWithCriticalExceptionInternal();
+    } finally {
+      releaseWriteLock();
+    }
+  }
+
+  private void stopAllPipesWithCriticalExceptionInternal() {
+    pipeMetaKeeper
+        .getPipeMetaList()
+        .forEach(
+            pipeMeta -> {
+              final PipeStaticMeta staticMeta = pipeMeta.getStaticMeta();
+              final PipeRuntimeMeta runtimeMeta = pipeMeta.getRuntimeMeta();
+
+              if (runtimeMeta.getStatus().get() == PipeStatus.RUNNING) {
+                runtimeMeta
+                    .getConsensusGroupId2TaskMetaMap()
+                    .values()
+                    .forEach(
+                        pipeTaskMeta -> {
+                          for (final PipeRuntimeException e : 
pipeTaskMeta.getExceptionMessages()) {
+                            if (e instanceof PipeRuntimeCriticalException) {
+                              stopPipe(staticMeta.getPipeName(), 
staticMeta.getCreationTime());
+                              return;
+                            }
+                          }
+                        });
+              }
+            });
+  }
+
   ////////////////////////// Manage by Pipe Name //////////////////////////
 
   /**
@@ -671,13 +707,13 @@ public class PipeTaskAgent {
       throws TException {
     acquireReadLock();
     try {
-      collectPipeMetaListWithoutLock(req, resp);
+      collectPipeMetaListInternal(req, resp);
     } finally {
       releaseReadLock();
     }
   }
 
-  private void collectPipeMetaListWithoutLock(THeartbeatReq req, 
THeartbeatResp resp)
+  private void collectPipeMetaListInternal(THeartbeatReq req, THeartbeatResp 
resp)
       throws TException {
     // Do nothing if data node is removing or removed, or request does not 
need pipe meta list
     if (PipeAgent.runtime().isShutdown() || !req.isNeedPipeMetaList()) {
@@ -700,13 +736,13 @@ public class PipeTaskAgent {
       throws TException {
     acquireReadLock();
     try {
-      collectPipeMetaListWithoutLock(req, resp);
+      collectPipeMetaListInternal(req, resp);
     } finally {
       releaseReadLock();
     }
   }
 
-  private void collectPipeMetaListWithoutLock(TPipeHeartbeatReq req, 
TPipeHeartbeatResp resp)
+  private void collectPipeMetaListInternal(TPipeHeartbeatReq req, 
TPipeHeartbeatResp resp)
       throws TException {
     // Do nothing if data node is removing or removed, or request does not 
need pipe meta list
     if (PipeAgent.runtime().isShutdown()) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/v1/IoTDBThriftConnectorV1.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/v1/IoTDBThriftConnectorV1.java
index b12646bdf02..d7ca781e95a 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/v1/IoTDBThriftConnectorV1.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/v1/IoTDBThriftConnectorV1.java
@@ -45,7 +45,6 @@ import org.apache.iotdb.pipe.api.exception.PipeException;
 import org.apache.iotdb.rpc.TSStatusCode;
 import org.apache.iotdb.service.rpc.thrift.TPipeTransferResp;
 
-import org.apache.commons.lang.NotImplementedException;
 import org.apache.thrift.TException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -69,6 +68,15 @@ public class IoTDBThriftConnectorV1 implements PipeConnector 
{
 
   private IoTDBThriftConnectorClient client;
 
+  public IoTDBThriftConnectorV1() {
+    // Do nothing
+  }
+
+  public IoTDBThriftConnectorV1(String ipAddress, int port) {
+    this.ipAddress = ipAddress;
+    this.port = port;
+  }
+
   @Override
   public void validate(PipeParameterValidator validator) throws Exception {
     validator
@@ -134,9 +142,11 @@ public class IoTDBThriftConnectorV1 implements 
PipeConnector {
       } else if (tabletInsertionEvent instanceof PipeRawTabletInsertionEvent) {
         doTransfer((PipeRawTabletInsertionEvent) tabletInsertionEvent);
       } else {
-        throw new NotImplementedException(
+        LOGGER.warn(
             "IoTDBThriftConnectorV1 only support "
-                + "PipeInsertNodeTabletInsertionEvent and 
PipeRawTabletInsertionEvent.");
+                + "PipeInsertNodeTabletInsertionEvent and 
PipeRawTabletInsertionEvent. "
+                + "Ignore {}.",
+            tabletInsertionEvent);
       }
     } catch (TException e) {
       throw new PipeConnectionException(
@@ -151,8 +161,10 @@ public class IoTDBThriftConnectorV1 implements 
PipeConnector {
   public void transfer(TsFileInsertionEvent tsFileInsertionEvent) throws 
Exception {
     // PipeProcessor can change the type of TabletInsertionEvent
     if (!(tsFileInsertionEvent instanceof PipeTsFileInsertionEvent)) {
-      throw new NotImplementedException(
-          "IoTDBThriftConnectorV1 only support PipeTsFileInsertionEvent.");
+      LOGGER.warn(
+          "IoTDBThriftConnectorV1 only support PipeTsFileInsertionEvent. 
Ignore {}.",
+          tsFileInsertionEvent);
+      return;
     }
 
     try {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/v2/IoTDBThriftConnectorV2.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/v2/IoTDBThriftConnectorV2.java
index 6d9372f7922..ffffd075051 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/v2/IoTDBThriftConnectorV2.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/v2/IoTDBThriftConnectorV2.java
@@ -23,10 +23,11 @@ import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.commons.client.ClientPoolFactory;
 import org.apache.iotdb.commons.client.IClientManager;
 import 
org.apache.iotdb.commons.client.async.AsyncPipeDataTransferServiceClient;
-import org.apache.iotdb.commons.client.property.ThriftClientProperty;
-import org.apache.iotdb.commons.conf.CommonConfig;
+import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.commons.concurrent.ThreadName;
+import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
 import org.apache.iotdb.commons.conf.CommonDescriptor;
-import org.apache.iotdb.db.pipe.connector.v1.IoTDBThriftConnectorClient;
+import org.apache.iotdb.db.pipe.connector.v1.IoTDBThriftConnectorV1;
 import org.apache.iotdb.db.pipe.connector.v1.request.PipeTransferHandshakeReq;
 import org.apache.iotdb.db.pipe.connector.v1.request.PipeTransferInsertNodeReq;
 import org.apache.iotdb.db.pipe.connector.v1.request.PipeTransferTabletReq;
@@ -51,8 +52,8 @@ import org.apache.iotdb.service.rpc.thrift.TPipeTransferResp;
 import org.apache.iotdb.session.util.SessionUtils;
 import org.apache.iotdb.tsfile.utils.Pair;
 
-import org.apache.commons.lang.NotImplementedException;
 import org.apache.thrift.TException;
+import org.apache.thrift.async.AsyncMethodCallback;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -63,25 +64,35 @@ import java.util.Comparator;
 import java.util.List;
 import java.util.Optional;
 import java.util.PriorityQueue;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
 
 import static 
org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_NODE_URLS_KEY;
 
 public class IoTDBThriftConnectorV2 implements PipeConnector {
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(IoTDBThriftConnectorV2.class);
+
   private static final String FAILED_TO_BORROW_CLIENT_FORMATTER =
       "Failed to borrow client from client pool for receiver %s:%s.";
 
-  private static final CommonConfig COMMON_CONFIG = 
CommonDescriptor.getInstance().getConfig();
-
-  private static volatile IClientManager<TEndPoint, 
AsyncPipeDataTransferServiceClient>
-      asyncPipeDataTransferClientManagerHolder;
+  private static final AtomicReference<
+          IClientManager<TEndPoint, AsyncPipeDataTransferServiceClient>>
+      ASYNC_PIPE_DATA_TRANSFER_CLIENT_MANAGER_HOLDER = new AtomicReference<>();
   private final IClientManager<TEndPoint, AsyncPipeDataTransferServiceClient>
       asyncPipeDataTransferClientManager;
 
-  private final AtomicBoolean isClosed = new AtomicBoolean(false);
+  private static final AtomicReference<ScheduledExecutorService> RETRY_TRIGGER 
=
+      new AtomicReference<>();
+  private static final int RETRY_TRIGGER_INTERVAL_MINUTES = 1;
+  private final AtomicReference<Future<?>> retryTriggerFuture = new 
AtomicReference<>();
+  private final AtomicReference<IoTDBThriftConnectorV1> retryConnector = new 
AtomicReference<>();
+  private final PriorityQueue<Pair<Long, Event>> retryEventQueue =
+      new PriorityQueue<>(Comparator.comparing(o -> o.left));
 
   private final AtomicLong commitIdGenerator = new AtomicLong(0);
   private final AtomicLong lastCommitId = new AtomicLong(0);
@@ -91,41 +102,17 @@ public class IoTDBThriftConnectorV2 implements 
PipeConnector {
   private List<TEndPoint> nodeUrls;
 
   public IoTDBThriftConnectorV2() {
-    if (asyncPipeDataTransferClientManagerHolder == null) {
+    if (ASYNC_PIPE_DATA_TRANSFER_CLIENT_MANAGER_HOLDER.get() == null) {
       synchronized (IoTDBThriftConnectorV2.class) {
-        if (asyncPipeDataTransferClientManagerHolder == null) {
-          asyncPipeDataTransferClientManagerHolder =
+        if (ASYNC_PIPE_DATA_TRANSFER_CLIENT_MANAGER_HOLDER.get() == null) {
+          ASYNC_PIPE_DATA_TRANSFER_CLIENT_MANAGER_HOLDER.set(
               new IClientManager.Factory<TEndPoint, 
AsyncPipeDataTransferServiceClient>()
                   .createClientManager(
-                      new 
ClientPoolFactory.AsyncPipeDataTransferServiceClientPoolFactory());
+                      new 
ClientPoolFactory.AsyncPipeDataTransferServiceClientPoolFactory()));
         }
       }
     }
-
-    asyncPipeDataTransferClientManager = 
asyncPipeDataTransferClientManagerHolder;
-  }
-
-  public synchronized void commit(long requestCommitId, @Nullable 
EnrichedEvent enrichedEvent) {
-    commitQueue.offer(
-        new Pair<>(
-            requestCommitId,
-            () ->
-                Optional.ofNullable(enrichedEvent)
-                    .ifPresent(
-                        event ->
-                            
event.decreaseReferenceCount(IoTDBThriftConnectorV2.class.getName()))));
-
-    while (!commitQueue.isEmpty()) {
-      final Pair<Long, Runnable> committer = commitQueue.peek();
-      if (lastCommitId.get() + 1 != committer.left) {
-        break;
-      }
-
-      committer.right.run();
-      lastCommitId.incrementAndGet();
-
-      commitQueue.poll();
-    }
+    asyncPipeDataTransferClientManager = 
ASYNC_PIPE_DATA_TRANSFER_CLIENT_MANAGER_HOLDER.get();
   }
 
   @Override
@@ -142,33 +129,58 @@ public class IoTDBThriftConnectorV2 implements 
PipeConnector {
             
Arrays.asList(parameters.getString(CONNECTOR_IOTDB_NODE_URLS_KEY).split(",")));
     if (nodeUrls.isEmpty()) {
       throw new PipeException("Node urls is empty.");
+    } else {
+      LOGGER.info("Node urls: {}.", nodeUrls);
     }
   }
 
   @Override
-  public void handshake() throws Exception {
-    final TEndPoint firstNodeUrl = nodeUrls.get(0);
-    try (IoTDBThriftConnectorClient client =
-        new IoTDBThriftConnectorClient(
-            new ThriftClientProperty.Builder()
-                
.setConnectionTimeoutMs(COMMON_CONFIG.getConnectionTimeoutInMS())
-                
.setRpcThriftCompressionEnabled(COMMON_CONFIG.isRpcThriftCompressionEnabled())
-                .build(),
-            firstNodeUrl.getIp(),
-            firstNodeUrl.getPort())) {
-      final TPipeTransferResp resp =
-          client.pipeTransfer(
-              PipeTransferHandshakeReq.toTPipeTransferReq(
-                  
CommonDescriptor.getInstance().getConfig().getTimestampPrecision()));
-      if (resp.getStatus().getCode() != 
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
-        throw new PipeException(String.format("Handshake error, result status 
%s.", resp.status));
+  // synchronized to avoid close connector when transfer event
+  public synchronized void handshake() throws Exception {
+    if (retryConnector.get() != null) {
+      try {
+        retryConnector.get().close();
+      } catch (Exception e) {
+        LOGGER.warn("Failed to close connector to receiver when try to 
handshake.", e);
+      }
+      retryConnector.set(null);
+    }
+
+    for (final TEndPoint endPoint : nodeUrls) {
+      final IoTDBThriftConnectorV1 connector =
+          new IoTDBThriftConnectorV1(endPoint.getIp(), endPoint.getPort());
+      try {
+        connector.handshake();
+        retryConnector.set(connector);
+        LOGGER.info(
+            "Handshake successfully with receiver {}:{}.", endPoint.getIp(), 
endPoint.getPort());
+        break;
+      } catch (Exception e) {
+        LOGGER.warn(
+            String.format(
+                "Handshake error with receiver %s:%s, retrying...",
+                endPoint.getIp(), endPoint.getPort()),
+            e);
+        try {
+          connector.close();
+        } catch (Exception ex) {
+          LOGGER.warn(
+              String.format(
+                  "Failed to close connector to receiver %s:%s when handshake 
error.",
+                  endPoint.getIp(), endPoint.getPort()),
+              ex);
+        }
       }
-    } catch (TException e) {
+    }
+
+    if (retryConnector.get() == null) {
       throw new PipeConnectionException(
           String.format(
-              "Connect to receiver %s:%s error: %s",
-              e.getMessage(), firstNodeUrl.getIp(), firstNodeUrl.getPort()),
-          e);
+              "Failed to connect to all receivers %s.",
+              nodeUrls.stream()
+                  .map(endPoint -> endPoint.getIp() + ":" + endPoint.getPort())
+                  .reduce((s1, s2) -> s1 + "," + s2)
+                  .orElse("")));
     }
   }
 
@@ -179,8 +191,20 @@ public class IoTDBThriftConnectorV2 implements 
PipeConnector {
 
   @Override
   public void transfer(TabletInsertionEvent tabletInsertionEvent) throws 
Exception {
+    transferQueuedEventsIfNecessary();
+
+    if (!(tabletInsertionEvent instanceof PipeInsertNodeTabletInsertionEvent)
+        && !(tabletInsertionEvent instanceof PipeRawTabletInsertionEvent)) {
+      LOGGER.warn(
+          "IoTDBThriftConnectorV2 only support 
PipeInsertNodeTabletInsertionEvent and PipeRawTabletInsertionEvent. "
+              + "Current event: {}.",
+          tabletInsertionEvent);
+      return;
+    }
+
+    final long requestCommitId = commitIdGenerator.incrementAndGet();
+
     if (tabletInsertionEvent instanceof PipeInsertNodeTabletInsertionEvent) {
-      final long requestCommitId = commitIdGenerator.incrementAndGet();
       final PipeInsertNodeTabletInsertionEvent 
pipeInsertNodeTabletInsertionEvent =
           (PipeInsertNodeTabletInsertionEvent) tabletInsertionEvent;
       final PipeTransferInsertNodeReq pipeTransferInsertNodeReq =
@@ -191,8 +215,7 @@ public class IoTDBThriftConnectorV2 implements 
PipeConnector {
               requestCommitId, pipeInsertNodeTabletInsertionEvent, 
pipeTransferInsertNodeReq, this);
 
       transfer(requestCommitId, pipeTransferInsertNodeReqHandler);
-    } else if (tabletInsertionEvent instanceof PipeRawTabletInsertionEvent) {
-      final long requestCommitId = commitIdGenerator.incrementAndGet();
+    } else { // tabletInsertionEvent instanceof PipeRawTabletInsertionEvent
       final PipeRawTabletInsertionEvent pipeRawTabletInsertionEvent =
           (PipeRawTabletInsertionEvent) tabletInsertionEvent;
       final PipeTransferTabletReq pipeTransferTabletReq =
@@ -201,24 +224,19 @@ public class IoTDBThriftConnectorV2 implements 
PipeConnector {
               pipeRawTabletInsertionEvent.isAligned());
       final PipeTransferRawTabletInsertionEventHandler 
pipeTransferTabletReqHandler =
           new PipeTransferRawTabletInsertionEventHandler(
-              requestCommitId, pipeTransferTabletReq, this);
+              requestCommitId, pipeRawTabletInsertionEvent, 
pipeTransferTabletReq, this);
 
       transfer(requestCommitId, pipeTransferTabletReqHandler);
-    } else {
-      throw new NotImplementedException(
-          "IoTDBThriftConnectorV2 only support "
-              + "PipeInsertNodeTabletInsertionEvent and 
PipeRawTabletInsertionEvent.");
     }
   }
 
-  public void transfer(
+  private void transfer(
       long requestCommitId,
       PipeTransferInsertNodeTabletInsertionEventHandler 
pipeTransferInsertNodeReqHandler) {
     final TEndPoint targetNodeUrl = nodeUrls.get((int) (requestCommitId % 
nodeUrls.size()));
 
     try {
-      final AsyncPipeDataTransferServiceClient client =
-          asyncPipeDataTransferClientManager.borrowClient(targetNodeUrl);
+      final AsyncPipeDataTransferServiceClient client = 
borrowClient(targetNodeUrl);
 
       try {
         pipeTransferInsertNodeReqHandler.transfer(client);
@@ -238,14 +256,13 @@ public class IoTDBThriftConnectorV2 implements 
PipeConnector {
     }
   }
 
-  public void transfer(
+  private void transfer(
       long requestCommitId,
       PipeTransferRawTabletInsertionEventHandler pipeTransferTabletReqHandler) 
{
     final TEndPoint targetNodeUrl = nodeUrls.get((int) (requestCommitId % 
nodeUrls.size()));
 
     try {
-      final AsyncPipeDataTransferServiceClient client =
-          asyncPipeDataTransferClientManager.borrowClient(targetNodeUrl);
+      final AsyncPipeDataTransferServiceClient client = 
borrowClient(targetNodeUrl);
 
       try {
         pipeTransferTabletReqHandler.transfer(client);
@@ -267,12 +284,17 @@ public class IoTDBThriftConnectorV2 implements 
PipeConnector {
 
   @Override
   public void transfer(TsFileInsertionEvent tsFileInsertionEvent) throws 
Exception {
+    transferQueuedEventsIfNecessary();
+
     if (!(tsFileInsertionEvent instanceof PipeTsFileInsertionEvent)) {
-      throw new NotImplementedException(
-          "IoTDBThriftConnectorV2 only support PipeTsFileInsertionEvent.");
+      LOGGER.warn(
+          "IoTDBThriftConnectorV2 only support PipeTsFileInsertionEvent. 
Current event: {}.",
+          tsFileInsertionEvent);
+      return;
     }
 
     final long requestCommitId = commitIdGenerator.incrementAndGet();
+
     final PipeTsFileInsertionEvent pipeTsFileInsertionEvent =
         (PipeTsFileInsertionEvent) tsFileInsertionEvent;
     final PipeTransferTsFileInsertionEventHandler 
pipeTransferTsFileInsertionEventHandler =
@@ -283,14 +305,13 @@ public class IoTDBThriftConnectorV2 implements 
PipeConnector {
     transfer(requestCommitId, pipeTransferTsFileInsertionEventHandler);
   }
 
-  public void transfer(
+  private void transfer(
       long requestCommitId,
       PipeTransferTsFileInsertionEventHandler 
pipeTransferTsFileInsertionEventHandler) {
     final TEndPoint targetNodeUrl = nodeUrls.get((int) (requestCommitId % 
nodeUrls.size()));
 
     try {
-      final AsyncPipeDataTransferServiceClient client =
-          asyncPipeDataTransferClientManager.borrowClient(targetNodeUrl);
+      final AsyncPipeDataTransferServiceClient client = 
borrowClient(targetNodeUrl);
 
       try {
         pipeTransferTsFileInsertionEventHandler.transfer(client);
@@ -311,16 +332,233 @@ public class IoTDBThriftConnectorV2 implements 
PipeConnector {
   }
 
   @Override
-  public void transfer(Event event) {
+  public void transfer(Event event) throws Exception {
+    transferQueuedEventsIfNecessary();
+
     LOGGER.warn("IoTDBThriftConnectorV2 does not support transfer generic 
event: {}.", event);
   }
 
-  @Override
-  public void close() {
-    isClosed.set(true);
+  private AsyncPipeDataTransferServiceClient borrowClient(TEndPoint 
targetNodeUrl)
+      throws PipeConnectionException {
+    try {
+      while (true) {
+        final AsyncPipeDataTransferServiceClient client =
+            asyncPipeDataTransferClientManager.borrowClient(targetNodeUrl);
+        if (handshakeIfNecessary(targetNodeUrl, client)) {
+          return client;
+        }
+      }
+    } catch (Exception e) {
+      throw new PipeConnectionException(
+          String.format(
+              FAILED_TO_BORROW_CLIENT_FORMATTER, targetNodeUrl.getIp(), 
targetNodeUrl.getPort()),
+          e);
+    }
+  }
+
+  /**
+   * Handshake with the target if necessary.
+   *
+   * @param client client to handshake
+   * @return true if the handshake is already finished, false if the handshake 
is not finished yet
+   *     and finished in this method
+   * @throws Exception if an error occurs.
+   */
+  private boolean handshakeIfNecessary(
+      TEndPoint targetNodeUrl, AsyncPipeDataTransferServiceClient client) 
throws Exception {
+    if (client.isHandshakeFinished()) {
+      return true;
+    }
+
+    final AtomicBoolean isHandshakeFinished = new AtomicBoolean(false);
+    final AtomicReference<Exception> exception = new AtomicReference<>();
+
+    client.pipeTransfer(
+        PipeTransferHandshakeReq.toTPipeTransferReq(
+            
CommonDescriptor.getInstance().getConfig().getTimestampPrecision()),
+        new AsyncMethodCallback<TPipeTransferResp>() {
+          @Override
+          public void onComplete(TPipeTransferResp response) {
+            if (response.getStatus().getCode() != 
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+              LOGGER.warn(
+                  "Handshake error with receiver {}:{}, code: {}, message: 
{}.",
+                  targetNodeUrl.getIp(),
+                  targetNodeUrl.getPort(),
+                  response.getStatus().getCode(),
+                  response.getStatus().getMessage());
+              exception.set(
+                  new PipeConnectionException(
+                      String.format(
+                          "Handshake error with receiver %s:%s, code: %d, 
message: %s.",
+                          targetNodeUrl.getIp(),
+                          targetNodeUrl.getPort(),
+                          response.getStatus().getCode(),
+                          response.getStatus().getMessage())));
+            } else {
+              LOGGER.info(
+                  "Handshake successfully with receiver {}:{}.",
+                  targetNodeUrl.getIp(),
+                  targetNodeUrl.getPort());
+              client.markHandshakeFinished();
+            }
+
+            isHandshakeFinished.set(true);
+          }
+
+          @Override
+          public void onError(Exception e) {
+            LOGGER.warn(
+                "Handshake error with receiver {}:{}.",
+                targetNodeUrl.getIp(),
+                targetNodeUrl.getPort(),
+                e);
+            exception.set(e);
+
+            isHandshakeFinished.set(true);
+          }
+        });
+
+    try {
+      while (!isHandshakeFinished.get()) {
+        Thread.sleep(10);
+      }
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      throw new PipeException("Interrupted while waiting for handshake 
response.", e);
+    }
+
+    if (exception.get() != null) {
+      throw new PipeConnectionException("Failed to handshake.", 
exception.get());
+    }
+
+    return false;
+  }
+
+  /**
+   * Transfer queued events which are waiting for retry.
+   *
+   * @throws Exception if an error occurs. The error will be handled by pipe 
framework, which will
+   *     retry the event and mark the event as failure and stop the pipe if 
the retry times exceeds
+   *     the threshold.
+   * @see PipeConnector#transfer(Event) for more details.
+   * @see PipeConnector#transfer(TabletInsertionEvent) for more details.
+   * @see PipeConnector#transfer(TsFileInsertionEvent) for more details.
+   */
+  private synchronized void transferQueuedEventsIfNecessary() throws Exception 
{
+    while (!retryEventQueue.isEmpty()) {
+      final Pair<Long, Event> queuedEventPair = retryEventQueue.peek();
+      final long requestCommitId = queuedEventPair.getLeft();
+      final Event event = queuedEventPair.getRight();
+
+      IoTDBThriftConnectorV1 connector = retryConnector.get();
+      if (connector == null) {
+        LOGGER.warn("Retry connector is broken. Will try to reconnect it by 
handshake.");
+        handshake();
+      }
+      connector = retryConnector.get();
+
+      if (event instanceof PipeInsertNodeTabletInsertionEvent) {
+        connector.transfer((PipeInsertNodeTabletInsertionEvent) event);
+      } else if (event instanceof PipeRawTabletInsertionEvent) {
+        connector.transfer((PipeRawTabletInsertionEvent) event);
+      } else if (event instanceof PipeTsFileInsertionEvent) {
+        connector.transfer((PipeTsFileInsertionEvent) event);
+      } else {
+        LOGGER.warn("IoTDBThriftConnectorV2 does not support transfer generic 
event: {}.", event);
+      }
+
+      if (event instanceof EnrichedEvent) {
+        commit(requestCommitId, (EnrichedEvent) event);
+      }
+
+      retryEventQueue.poll();
+    }
   }
 
-  public boolean isClosed() {
-    return isClosed.get();
+  /**
+   * Commit the event. Decrease the reference count of the event. If the 
reference count is 0, the
+   * progress index of the event will be recalculated and the resources of the 
event will be
+   * released.
+   *
+   * <p>The synchronization is necessary because the commit order must be the 
same as the order of
+   * the events. Concurrent commit may cause the commit order to be 
inconsistent with the order of
+   * the events.
+   *
+   * @param requestCommitId commit id of the request
+   * @param enrichedEvent event to commit
+   */
+  public synchronized void commit(long requestCommitId, @Nullable 
EnrichedEvent enrichedEvent) {
+    commitQueue.offer(
+        new Pair<>(
+            requestCommitId,
+            () ->
+                Optional.ofNullable(enrichedEvent)
+                    .ifPresent(
+                        event ->
+                            
event.decreaseReferenceCount(IoTDBThriftConnectorV2.class.getName()))));
+
+    while (!commitQueue.isEmpty()) {
+      final Pair<Long, Runnable> committer = commitQueue.peek();
+      if (lastCommitId.get() + 1 != committer.left) {
+        break;
+      }
+
+      committer.right.run();
+      lastCommitId.incrementAndGet();
+
+      commitQueue.poll();
+    }
+  }
+
+  /**
+   * Add failure event to retry queue.
+   *
+   * @param requestCommitId commit id of the request
+   * @param event event to retry
+   */
+  public void addFailureEventToRetryQueue(long requestCommitId, Event event) {
+    if (RETRY_TRIGGER.get() == null) {
+      synchronized (IoTDBThriftConnectorV2.class) {
+        if (RETRY_TRIGGER.get() == null) {
+          RETRY_TRIGGER.set(
+              IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor(
+                  ThreadName.PIPE_ASYNC_CONNECTOR_RETRY_TRIGGER.getName()));
+        }
+      }
+    }
+
+    if (retryTriggerFuture.get() == null) {
+      synchronized (IoTDBThriftConnectorV2.class) {
+        if (retryTriggerFuture.get() == null) {
+          retryTriggerFuture.set(
+              ScheduledExecutorUtil.safelyScheduleWithFixedDelay(
+                  RETRY_TRIGGER.get(),
+                  () -> {
+                    try {
+                      transferQueuedEventsIfNecessary();
+                    } catch (Exception e) {
+                      LOGGER.warn("Failed to trigger retry.", e);
+                    }
+                  },
+                  RETRY_TRIGGER_INTERVAL_MINUTES,
+                  RETRY_TRIGGER_INTERVAL_MINUTES,
+                  TimeUnit.MINUTES));
+        }
+      }
+    }
+
+    retryEventQueue.offer(new Pair<>(requestCommitId, event));
+  }
+
+  @Override
+  // synchronized to avoid close connector when transfer event
+  public synchronized void close() throws Exception {
+    if (retryTriggerFuture.get() != null) {
+      retryTriggerFuture.get().cancel(false);
+    }
+
+    if (retryConnector.get() != null) {
+      retryConnector.get().close();
+    }
   }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/v2/handler/PipeTransferInsertNodeTabletInsertionEventHandler.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/v2/handler/PipeTransferInsertNodeTabletInsertionEventHandler.java
index f8694e71aba..4cb3643ba03 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/v2/handler/PipeTransferInsertNodeTabletInsertionEventHandler.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/v2/handler/PipeTransferInsertNodeTabletInsertionEventHandler.java
@@ -21,19 +21,18 @@ package org.apache.iotdb.db.pipe.connector.v2.handler;
 
 import 
org.apache.iotdb.commons.client.async.AsyncPipeDataTransferServiceClient;
 import org.apache.iotdb.db.pipe.connector.v2.IoTDBThriftConnectorV2;
-import org.apache.iotdb.db.pipe.event.EnrichedEvent;
+import 
org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent;
 import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
 import org.apache.iotdb.service.rpc.thrift.TPipeTransferResp;
 
 import org.apache.thrift.TException;
 
-import javax.annotation.Nullable;
-
 public class PipeTransferInsertNodeTabletInsertionEventHandler
     extends PipeTransferTabletInsertionEventHandler<TPipeTransferResp> {
+
   public PipeTransferInsertNodeTabletInsertionEventHandler(
       long requestCommitId,
-      @Nullable EnrichedEvent event,
+      PipeInsertNodeTabletInsertionEvent event,
       TPipeTransferReq req,
       IoTDBThriftConnectorV2 connector) {
     super(requestCommitId, event, req, connector);
@@ -44,9 +43,4 @@ public class PipeTransferInsertNodeTabletInsertionEventHandler
       throws TException {
     client.pipeTransfer(req, this);
   }
-
-  @Override
-  protected void retryTransfer(IoTDBThriftConnectorV2 connector, long 
requestCommitId) {
-    connector.transfer(requestCommitId, this);
-  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/v2/handler/PipeTransferRawTabletInsertionEventHandler.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/v2/handler/PipeTransferRawTabletInsertionEventHandler.java
index d0096eaafc2..c35c0103ce2 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/v2/handler/PipeTransferRawTabletInsertionEventHandler.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/v2/handler/PipeTransferRawTabletInsertionEventHandler.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.pipe.connector.v2.handler;
 
 import 
org.apache.iotdb.commons.client.async.AsyncPipeDataTransferServiceClient;
 import org.apache.iotdb.db.pipe.connector.v2.IoTDBThriftConnectorV2;
+import 
org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
 import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
 import org.apache.iotdb.service.rpc.thrift.TPipeTransferResp;
 
@@ -30,8 +31,11 @@ public class PipeTransferRawTabletInsertionEventHandler
     extends PipeTransferTabletInsertionEventHandler<TPipeTransferResp> {
 
   public PipeTransferRawTabletInsertionEventHandler(
-      long requestCommitId, TPipeTransferReq req, IoTDBThriftConnectorV2 
connector) {
-    super(requestCommitId, null, req, connector);
+      long requestCommitId,
+      PipeRawTabletInsertionEvent event,
+      TPipeTransferReq req,
+      IoTDBThriftConnectorV2 connector) {
+    super(requestCommitId, event, req, connector);
   }
 
   @Override
@@ -39,9 +43,4 @@ public class PipeTransferRawTabletInsertionEventHandler
       throws TException {
     client.pipeTransfer(req, this);
   }
-
-  @Override
-  protected void retryTransfer(IoTDBThriftConnectorV2 connector, long 
requestCommitId) {
-    connector.transfer(requestCommitId, this);
-  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/v2/handler/PipeTransferTabletInsertionEventHandler.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/v2/handler/PipeTransferTabletInsertionEventHandler.java
index 0f686bad747..3ab2220a477 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/v2/handler/PipeTransferTabletInsertionEventHandler.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/v2/handler/PipeTransferTabletInsertionEventHandler.java
@@ -20,9 +20,9 @@
 package org.apache.iotdb.db.pipe.connector.v2.handler;
 
 import 
org.apache.iotdb.commons.client.async.AsyncPipeDataTransferServiceClient;
-import org.apache.iotdb.commons.pipe.config.PipeConfig;
 import org.apache.iotdb.db.pipe.connector.v2.IoTDBThriftConnectorV2;
 import org.apache.iotdb.db.pipe.event.EnrichedEvent;
+import org.apache.iotdb.pipe.api.event.Event;
 import org.apache.iotdb.pipe.api.exception.PipeException;
 import org.apache.iotdb.rpc.TSStatusCode;
 import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
@@ -33,10 +33,7 @@ import org.apache.thrift.async.AsyncMethodCallback;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import javax.annotation.Nullable;
-
 import java.util.Optional;
-import java.util.concurrent.CompletableFuture;
 
 public abstract class PipeTransferTabletInsertionEventHandler<E extends 
TPipeTransferResp>
     implements AsyncMethodCallback<E> {
@@ -45,20 +42,13 @@ public abstract class 
PipeTransferTabletInsertionEventHandler<E extends TPipeTra
       LoggerFactory.getLogger(PipeTransferTabletInsertionEventHandler.class);
 
   private final long requestCommitId;
-  private final EnrichedEvent event;
+  private final Event event;
   private final TPipeTransferReq req;
 
   private final IoTDBThriftConnectorV2 connector;
 
-  private static final long MAX_RETRY_WAIT_TIME_MS =
-      (long) (PipeConfig.getInstance().getPipeConnectorRetryIntervalMs() * 
Math.pow(2, 5));
-  private int retryCount = 0;
-
   protected PipeTransferTabletInsertionEventHandler(
-      long requestCommitId,
-      @Nullable EnrichedEvent event,
-      TPipeTransferReq req,
-      IoTDBThriftConnectorV2 connector) {
+      long requestCommitId, Event event, TPipeTransferReq req, 
IoTDBThriftConnectorV2 connector) {
     this.requestCommitId = requestCommitId;
     this.event = event;
     this.req = req;
@@ -66,7 +56,13 @@ public abstract class 
PipeTransferTabletInsertionEventHandler<E extends TPipeTra
 
     Optional.ofNullable(event)
         .ifPresent(
-            e -> 
e.increaseReferenceCount(PipeTransferTabletInsertionEventHandler.class.getName()));
+            e -> {
+              if (e instanceof EnrichedEvent) {
+                ((EnrichedEvent) e)
+                    .increaseReferenceCount(
+                        
PipeTransferTabletInsertionEventHandler.class.getName());
+              }
+            });
   }
 
   public void transfer(AsyncPipeDataTransferServiceClient client) throws 
TException {
@@ -85,7 +81,8 @@ public abstract class 
PipeTransferTabletInsertionEventHandler<E extends TPipeTra
     }
 
     if (response.getStatus().getCode() == 
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
-      connector.commit(requestCommitId, event);
+      connector.commit(
+          requestCommitId, event instanceof EnrichedEvent ? (EnrichedEvent) 
event : null);
     } else {
       onError(new PipeException(response.getStatus().getMessage()));
     }
@@ -93,40 +90,12 @@ public abstract class 
PipeTransferTabletInsertionEventHandler<E extends TPipeTra
 
   @Override
   public void onError(Exception exception) {
-    ++retryCount;
-
-    CompletableFuture.runAsync(
-        () -> {
-          try {
-            Thread.sleep(
-                Math.min(
-                    (long)
-                        
(PipeConfig.getInstance().getPipeConnectorRetryIntervalMs()
-                            * Math.pow(2d, retryCount - 1d)),
-                    MAX_RETRY_WAIT_TIME_MS));
-          } catch (InterruptedException e) {
-            Thread.currentThread().interrupt();
-            LOGGER.warn("Unexpected interruption during retrying", e);
-          }
-
-          if (connector.isClosed()) {
-            LOGGER.info(
-                "IoTDBThriftConnectorV2 has been stopped, "
-                    + "we will not retry this request {} after {} times",
-                req,
-                retryCount,
-                exception);
-          } else {
-            LOGGER.warn(
-                "IoTDBThriftConnectorV2 failed to transfer request {} after {} 
times, retrying...",
-                req,
-                retryCount,
-                exception);
-
-            retryTransfer(connector, requestCommitId);
-          }
-        });
-  }
+    LOGGER.warn(
+        "Failed to transfer TabletInsertionEvent {} (requestCommitId={}).",
+        event,
+        requestCommitId,
+        exception);
 
-  protected abstract void retryTransfer(IoTDBThriftConnectorV2 connector, long 
requestCommitId);
+    connector.addFailureEventToRetryQueue(requestCommitId, event);
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/v2/handler/PipeTransferTsFileInsertionEventHandler.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/v2/handler/PipeTransferTsFileInsertionEventHandler.java
index 692fa0f7ada..2172d551e9f 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/v2/handler/PipeTransferTsFileInsertionEventHandler.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/v2/handler/PipeTransferTsFileInsertionEventHandler.java
@@ -40,7 +40,6 @@ import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.RandomAccessFile;
 import java.util.Arrays;
-import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 public class PipeTransferTsFileInsertionEventHandler
@@ -58,16 +57,12 @@ public class PipeTransferTsFileInsertionEventHandler
   private final byte[] readBuffer;
   private long position;
 
-  private RandomAccessFile reader;
+  private final RandomAccessFile reader;
 
   private final AtomicBoolean isSealSignalSent;
 
   private AsyncPipeDataTransferServiceClient client;
 
-  private static final long MAX_RETRY_WAIT_TIME_MS =
-      (long) (PipeConfig.getInstance().getPipeConnectorRetryIntervalMs() * 
Math.pow(2, 5));
-  private int retryCount = 0;
-
   public PipeTransferTsFileInsertionEventHandler(
       long requestCommitId, PipeTsFileInsertionEvent event, 
IoTDBThriftConnectorV2 connector)
       throws FileNotFoundException {
@@ -127,14 +122,17 @@ public class PipeTransferTsFileInsertionEventHandler
           reader.close();
         }
       } catch (IOException e) {
-        LOGGER.warn("Failed to close file reader.", e);
+        LOGGER.warn("Failed to close file reader when successfully transferred 
file.", e);
       } finally {
+        connector.commit(requestCommitId, event);
+
+        LOGGER.info(
+            "Successfully transferred file {}. Request commit id is {}.", 
tsFile, requestCommitId);
+
         if (client != null) {
           client.setShouldReturnSelf(true);
           client.returnSelf();
         }
-
-        connector.commit(requestCommitId, event);
       }
       return;
     }
@@ -165,58 +163,22 @@ public class PipeTransferTsFileInsertionEventHandler
 
   @Override
   public void onError(Exception exception) {
+    LOGGER.warn(
+        "Failed to transfer tsfile {} (request commit id {}).", tsFile, 
requestCommitId, exception);
+
     try {
       if (reader != null) {
         reader.close();
       }
     } catch (IOException e) {
-      LOGGER.warn("Failed to close file reader.", e);
+      LOGGER.warn("Failed to close file reader when failed to transfer file.", 
e);
     } finally {
+      connector.addFailureEventToRetryQueue(requestCommitId, event);
+
       if (client != null) {
         client.setShouldReturnSelf(true);
         client.returnSelf();
       }
     }
-
-    ++retryCount;
-
-    CompletableFuture.runAsync(
-        () -> {
-          try {
-            Thread.sleep(
-                Math.min(
-                    (long)
-                        
(PipeConfig.getInstance().getPipeConnectorRetryIntervalMs()
-                            * Math.pow(2d, retryCount - 1d)),
-                    MAX_RETRY_WAIT_TIME_MS));
-          } catch (InterruptedException e) {
-            Thread.currentThread().interrupt();
-            LOGGER.warn("Unexpected interruption during retrying", e);
-          }
-
-          if (connector.isClosed()) {
-            LOGGER.info(
-                "IoTDBThriftConnectorV2 has been stopped, we will not retry to 
transfer tsfile {}.",
-                tsFile,
-                exception);
-          } else {
-            LOGGER.warn(
-                "IoTDBThriftConnectorV2 failed to transfer tsfile {} after {} 
times, retrying...",
-                tsFile,
-                retryCount,
-                exception);
-
-            try {
-              position = 0;
-              reader = new RandomAccessFile(tsFile, "r");
-              isSealSignalSent.set(false);
-
-              connector.transfer(requestCommitId, this);
-            } catch (FileNotFoundException e) {
-              LOGGER.error("Exception occurred when retrying...", e);
-              onError(e);
-            }
-          }
-        });
   }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/LoadTsFileManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/LoadTsFileManager.java
index d3967213e06..b1ab2b77e6b 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/LoadTsFileManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/LoadTsFileManager.java
@@ -41,6 +41,7 @@ import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.io.IOException;
+import java.nio.file.DirectoryNotEmptyException;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.util.HashMap;
@@ -162,6 +163,8 @@ public class LoadTsFileManager {
     try {
       Files.delete(loadDirPath);
       LOGGER.info("Load dir {} was deleted.", loadDirPath);
+    } catch (DirectoryNotEmptyException e) {
+      LOGGER.info("Load dir {} is not empty, skip deleting.", loadDirPath);
     } catch (IOException e) {
       LOGGER.warn(MESSAGE_DELETE_FAIL, loadDirPath, e);
     }
@@ -179,6 +182,8 @@ public class LoadTsFileManager {
     try {
       Files.delete(loadDirPath);
       LOGGER.info("Load dir {} was deleted.", loadDirPath);
+    } catch (DirectoryNotEmptyException e) {
+      LOGGER.info("Load dir {} is not empty, skip deleting.", loadDirPath);
     } catch (IOException e) {
       LOGGER.warn(MESSAGE_DELETE_FAIL, loadDirPath, e);
     }
@@ -286,6 +291,8 @@ public class LoadTsFileManager {
       }
       try {
         Files.delete(taskDir.toPath());
+      } catch (DirectoryNotEmptyException e) {
+        LOGGER.info("Task dir {} is not empty, skip deleting.", 
taskDir.getPath());
       } catch (IOException e) {
         LOGGER.warn(MESSAGE_DELETE_FAIL, taskDir.getPath(), e);
       }
diff --git 
a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-common.properties 
b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-common.properties
index 98104de3b5b..2a6ead060ec 100644
--- 
a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-common.properties
+++ 
b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-common.properties
@@ -1008,6 +1008,18 @@ cluster_name=defaultCluster
 # The size of the pending queue for the PipeConnector to store the events.
 # pipe_connector_pending_queue_size=1024
 
+# If the thrift RPC compression is enabled.
+# pipe_async_connector_rpc_thrift_compression_enabled=false
+
+# The maximum number of selectors that can be used in the async connector.
+# pipe_async_connector_selector_number=1
+
+# The core number of clients that can be used in the async connector.
+# pipe_async_connector_core_client_number=8
+
+# The maximum number of clients that can be used in the async connector.
+# pipe_async_connector_max_client_number=16
+
 # True if the pipe heartbeat is seperated from the cluster's heartbeat, false 
if the pipe heartbeat is
 # merged with the cluster's heartbeat.
 # pipe_heartbeat_seperated_mode_enabled=true
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientPoolFactory.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientPoolFactory.java
index b0dc9ad2e91..72683e3a29d 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientPoolFactory.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientPoolFactory.java
@@ -260,19 +260,21 @@ public class ClientPoolFactory {
     @Override
     public KeyedObjectPool<TEndPoint, AsyncPipeDataTransferServiceClient> 
createClientPool(
         ClientManager<TEndPoint, AsyncPipeDataTransferServiceClient> manager) {
-      GenericKeyedObjectPool<TEndPoint, AsyncPipeDataTransferServiceClient> 
clientPool =
+      final GenericKeyedObjectPool<TEndPoint, 
AsyncPipeDataTransferServiceClient> clientPool =
           new GenericKeyedObjectPool<>(
               new AsyncPipeDataTransferServiceClient.Factory(
                   manager,
                   new ThriftClientProperty.Builder()
-                      .setConnectionTimeoutMs(conf.getConnectionTimeoutInMS())
-                      
.setRpcThriftCompressionEnabled(conf.isRpcThriftCompressionEnabled())
-                      
.setSelectorNumOfAsyncClientManager(conf.getSelectorNumOfClientManager())
+                      .setConnectionTimeoutMs((int) 
conf.getPipeConnectorTimeoutMs())
+                      .setRpcThriftCompressionEnabled(
+                          
conf.isPipeAsyncConnectorRPCThriftCompressionEnabled())
+                      .setSelectorNumOfAsyncClientManager(
+                          conf.getPipeAsyncConnectorSelectorNumber())
                       .build(),
-                  ThreadName.ASYNC_DATANODE_CLIENT_POOL.getName()),
+                  ThreadName.PIPE_ASYNC_CONNECTOR_CLIENT_POOL.getName()),
               new 
ClientPoolProperty.Builder<AsyncPipeDataTransferServiceClient>()
-                  
.setCoreClientNumForEachNode(conf.getCoreClientNumForEachNode())
-                  
.setMaxClientNumForEachNode(conf.getMaxClientNumForEachNode())
+                  
.setCoreClientNumForEachNode(conf.getPipeAsyncConnectorCoreClientNumber())
+                  
.setMaxClientNumForEachNode(conf.getPipeAsyncConnectorMaxClientNumber())
                   .build()
                   .getConfig());
       ClientManagerMetrics.getInstance()
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncPipeDataTransferServiceClient.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncPipeDataTransferServiceClient.java
index 534349460f1..ad0f006b5e0 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncPipeDataTransferServiceClient.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncPipeDataTransferServiceClient.java
@@ -35,6 +35,7 @@ import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 
 public class AsyncPipeDataTransferServiceClient extends 
IClientRPCService.AsyncClient
     implements ThriftClient {
@@ -42,6 +43,9 @@ public class AsyncPipeDataTransferServiceClient extends 
IClientRPCService.AsyncC
   private static final Logger LOGGER =
       LoggerFactory.getLogger(AsyncPipeDataTransferServiceClient.class);
 
+  private static final AtomicInteger idGenerator = new AtomicInteger(0);
+  private final int id = idGenerator.incrementAndGet();
+
   private final boolean printLogWhenEncounterException;
 
   private final TEndPoint endpoint;
@@ -49,6 +53,8 @@ public class AsyncPipeDataTransferServiceClient extends 
IClientRPCService.AsyncC
 
   private final AtomicBoolean shouldReturnSelf = new AtomicBoolean(true);
 
+  private final AtomicBoolean isHandshakeFinished = new AtomicBoolean(false);
+
   public AsyncPipeDataTransferServiceClient(
       ThriftClientProperty property,
       TEndPoint endpoint,
@@ -81,7 +87,7 @@ public class AsyncPipeDataTransferServiceClient extends 
IClientRPCService.AsyncC
   @Override
   public void invalidate() {
     if (!hasError()) {
-      super.onError(new Exception("This client has been invalidated"));
+      super.onError(new Exception(String.format("This client %d has been 
invalidated", id)));
     }
   }
 
@@ -126,9 +132,18 @@ public class AsyncPipeDataTransferServiceClient extends 
IClientRPCService.AsyncC
     }
   }
 
+  public boolean isHandshakeFinished() {
+    return isHandshakeFinished.get();
+  }
+
+  public void markHandshakeFinished() {
+    isHandshakeFinished.set(true);
+    LOGGER.info("Handshake finished for client {}", this);
+  }
+
   @Override
   public String toString() {
-    return String.format("AsyncPipeDataTransferServiceClient{%s}", endpoint);
+    return String.format("AsyncPipeDataTransferServiceClient{%s}, id = {%d}", 
endpoint, id);
   }
 
   public static class Factory
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
index 6464d6129a3..a4ee9044800 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
@@ -129,6 +129,8 @@ public enum ThreadName {
   PIPE_RUNTIME_META_SYNCER("Pipe-Runtime-Meta-Syncer"),
   PIPE_RUNTIME_HEARTBEAT("Pipe-Runtime-Heartbeat"),
   PIPE_RUNTIME_PROCEDURE_SUBMITTER("Pipe-Runtime-Procedure-Submitter"),
+  PIPE_ASYNC_CONNECTOR_CLIENT_POOL("Pipe-Async-Connector-Client-Pool"),
+  PIPE_ASYNC_CONNECTOR_RETRY_TRIGGER("Pipe-Async-Connector-Retry-Trigger"),
   PIPE_WAL_RESOURCE_TTL_CHECKER("Pipe-WAL-Resource-TTL-Checker"),
   WINDOW_EVALUATION_SERVICE("WindowEvaluationTaskPoolManager"),
   STATEFUL_TRIGGER_INFORMATION_UPDATER("Stateful-Trigger-Information-Updater"),
@@ -259,7 +261,10 @@ public enum ThreadName {
               PIPE_CONNECTOR_EXECUTOR_POOL,
               PIPE_SUBTASK_CALLBACK_EXECUTOR_POOL,
               PIPE_RUNTIME_META_SYNCER,
+              PIPE_RUNTIME_HEARTBEAT,
               PIPE_RUNTIME_PROCEDURE_SUBMITTER,
+              PIPE_ASYNC_CONNECTOR_CLIENT_POOL,
+              PIPE_ASYNC_CONNECTOR_RETRY_TRIGGER,
               PIPE_WAL_RESOURCE_TTL_CHECKER,
               WINDOW_EVALUATION_SERVICE,
               STATEFUL_TRIGGER_INFORMATION_UPDATER));
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
index eb8d5f7b2f7..6405201d26d 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
@@ -167,6 +167,11 @@ public class CommonConfig {
   private long pipeConnectorRetryIntervalMs = 1000L;
   private int pipeConnectorPendingQueueSize = 1024;
 
+  private boolean pipeAsyncConnectorRPCThriftCompressionEnabled = false;
+  private int pipeAsyncConnectorSelectorNumber = 1;
+  private int pipeAsyncConnectorCoreClientNumber = 8;
+  private int pipeAsyncConnectorMaxClientNumber = 16;
+
   private boolean isSeperatedPipeHeartbeatEnabled = true;
   private int pipeHeartbeatIntervalSecondsForCollectingPipeMeta = 100;
   private long pipeMetaSyncerInitialSyncDelayMinutes = 3;
@@ -554,6 +559,40 @@ public class CommonConfig {
     this.pipeConnectorReadFileBufferSize = pipeConnectorReadFileBufferSize;
   }
 
+  public void setPipeAsyncConnectorRPCThriftCompressionEnabled(
+      boolean pipeAsyncConnectorRPCThriftCompressionEnabled) {
+    this.pipeAsyncConnectorRPCThriftCompressionEnabled =
+        pipeAsyncConnectorRPCThriftCompressionEnabled;
+  }
+
+  public boolean isPipeAsyncConnectorRPCThriftCompressionEnabled() {
+    return pipeAsyncConnectorRPCThriftCompressionEnabled;
+  }
+
+  public int getPipeAsyncConnectorSelectorNumber() {
+    return pipeAsyncConnectorSelectorNumber;
+  }
+
+  public void setPipeAsyncConnectorSelectorNumber(int 
pipeAsyncConnectorSelectorNumber) {
+    this.pipeAsyncConnectorSelectorNumber = pipeAsyncConnectorSelectorNumber;
+  }
+
+  public int getPipeAsyncConnectorCoreClientNumber() {
+    return pipeAsyncConnectorCoreClientNumber;
+  }
+
+  public void setPipeAsyncConnectorCoreClientNumber(int 
pipeAsyncConnectorCoreClientNumber) {
+    this.pipeAsyncConnectorCoreClientNumber = 
pipeAsyncConnectorCoreClientNumber;
+  }
+
+  public int getPipeAsyncConnectorMaxClientNumber() {
+    return pipeAsyncConnectorMaxClientNumber;
+  }
+
+  public void setPipeAsyncConnectorMaxClientNumber(int 
pipeAsyncConnectorMaxClientNumber) {
+    this.pipeAsyncConnectorMaxClientNumber = pipeAsyncConnectorMaxClientNumber;
+  }
+
   public boolean isSeperatedPipeHeartbeatEnabled() {
     return isSeperatedPipeHeartbeatEnabled;
   }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
index f45c92d479c..d29c0333a59 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
@@ -327,6 +327,27 @@ public class CommonDescriptor {
                 "pipe_connector_pending_queue_size",
                 String.valueOf(config.getPipeConnectorPendingQueueSize()))));
 
+    config.setPipeAsyncConnectorRPCThriftCompressionEnabled(
+        Boolean.parseBoolean(
+            properties.getProperty(
+                "pipe_async_connector_rpc_thrift_compression_enable",
+                
String.valueOf(config.isPipeAsyncConnectorRPCThriftCompressionEnabled()))));
+    config.setPipeAsyncConnectorSelectorNumber(
+        Integer.parseInt(
+            properties.getProperty(
+                "pipe_async_connector_selector_number",
+                
String.valueOf(config.getPipeAsyncConnectorSelectorNumber()))));
+    config.setPipeAsyncConnectorCoreClientNumber(
+        Integer.parseInt(
+            properties.getProperty(
+                "pipe_async_connector_core_client_number",
+                
String.valueOf(config.getPipeAsyncConnectorCoreClientNumber()))));
+    config.setPipeAsyncConnectorMaxClientNumber(
+        Integer.parseInt(
+            properties.getProperty(
+                "pipe_async_connector_max_client_number",
+                
String.valueOf(config.getPipeAsyncConnectorMaxClientNumber()))));
+
     config.setSeperatedPipeHeartbeatEnabled(
         Boolean.parseBoolean(
             properties.getProperty(
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
index 9cb9f4d70c0..db33599278b 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
@@ -107,6 +107,22 @@ public class PipeConfig {
     return COMMON_CONFIG.getPipeConnectorPendingQueueSize();
   }
 
+  public boolean isPipeAsyncConnectorRPCThriftCompressionEnabled() {
+    return COMMON_CONFIG.isPipeAsyncConnectorRPCThriftCompressionEnabled();
+  }
+
+  public int getPipeAsyncConnectorSelectorNumber() {
+    return COMMON_CONFIG.getPipeAsyncConnectorSelectorNumber();
+  }
+
+  public int getPipeAsyncConnectorCoreClientNumber() {
+    return COMMON_CONFIG.getPipeAsyncConnectorCoreClientNumber();
+  }
+
+  public int getPipeAsyncConnectorMaxClientNumber() {
+    return COMMON_CONFIG.getPipeAsyncConnectorMaxClientNumber();
+  }
+
   /////////////////////////////// Meta Consistency 
///////////////////////////////
 
   public boolean isSeperatedPipeHeartbeatEnabled() {

Reply via email to