This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new a87fc6dd51c IoTV2: Refine receiver and sender transfer logic to 
prevent stuck (#15569)
a87fc6dd51c is described below

commit a87fc6dd51c94ceae1ac4fda53e4821c35a85712
Author: Peng Junzhi <[email protected]>
AuthorDate: Wed May 28 11:40:23 2025 +0800

    IoTV2: Refine receiver and sender transfer logic to prevent stuck (#15569)
---
 .../java/org/apache/iotdb/rpc/TSStatusCode.java    |   1 +
 .../apache/iotdb/consensus/ConsensusFactory.java   |   4 +-
 .../apache/iotdb/consensus/pipe/PipeConsensus.java |   7 +-
 .../pipeconsensus/PipeConsensusAsyncConnector.java | 350 ++++++++++++++-------
 .../pipeconsensus/PipeConsensusSyncConnector.java  |   4 +-
 ...r.java => PipeConsensusDeleteEventHandler.java} |  64 ++--
 .../PipeConsensusTabletBatchEventHandler.java      |   2 +-
 .../PipeConsensusTabletInsertionEventHandler.java  |  21 +-
 .../PipeConsensusTsFileInsertionEventHandler.java  |  10 +
 .../PipeConsensusTransferBatchReqBuilder.java      |   6 +-
 .../pipeconsensus/PipeConsensusReceiver.java       | 138 +++++---
 ...ner.java => IoTV2GlobalComponentContainer.java} |  45 ++-
 .../iotdb/commons/concurrent/ThreadName.java       |   1 +
 .../iotdb/commons/pipe/event/EnrichedEvent.java    |  10 +
 .../pipe/receiver/PipeReceiverStatusHandler.java   |   5 +
 .../org/apache/iotdb/commons/utils/RetryUtils.java |  12 +
 16 files changed, 457 insertions(+), 223 deletions(-)

diff --git 
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java 
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
index d49fbd04bb1..d054a31e74d 100644
--- 
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
+++ 
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
@@ -319,6 +319,7 @@ public enum TSStatusCode {
   CONSENSUS_GROUP_NOT_EXIST(2206),
   RATIS_READ_UNAVAILABLE(2207),
   PIPE_CONSENSUS_CLOSE_ERROR(2208),
+  PIPE_CONSENSUS_WAIT_ORDER_TIMEOUT(2209),
   ;
 
   private final int statusCode;
diff --git 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ConsensusFactory.java
 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ConsensusFactory.java
index 6196b37f123..a9dd2c1e5ea 100644
--- 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ConsensusFactory.java
+++ 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ConsensusFactory.java
@@ -19,7 +19,7 @@
 
 package org.apache.iotdb.consensus;
 
-import 
org.apache.iotdb.commons.client.container.PipeConsensusClientMgrContainer;
+import org.apache.iotdb.commons.client.container.IoTV2GlobalComponentContainer;
 import org.apache.iotdb.consensus.config.ConsensusConfig;
 import org.apache.iotdb.consensus.pipe.metric.PipeConsensusSyncLagManager;
 
@@ -55,7 +55,7 @@ public class ConsensusFactory {
       if (className.equals(IOT_CONSENSUS_V2)) {
         className = REAL_PIPE_CONSENSUS;
         // initialize iotConsensusV2's thrift component
-        PipeConsensusClientMgrContainer.build();
+        IoTV2GlobalComponentContainer.build();
         // initialize iotConsensusV2's metric component
         PipeConsensusSyncLagManager.build();
       }
diff --git 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/PipeConsensus.java
 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/PipeConsensus.java
index 045b6db26ec..e7a10d3b9d3 100644
--- 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/PipeConsensus.java
+++ 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/PipeConsensus.java
@@ -23,7 +23,7 @@ import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.client.IClientManager;
 import org.apache.iotdb.commons.client.async.AsyncPipeConsensusServiceClient;
-import 
org.apache.iotdb.commons.client.container.PipeConsensusClientMgrContainer;
+import org.apache.iotdb.commons.client.container.IoTV2GlobalComponentContainer;
 import org.apache.iotdb.commons.client.sync.SyncPipeConsensusServiceClient;
 import org.apache.iotdb.commons.consensus.ConsensusGroupId;
 import org.apache.iotdb.commons.exception.StartupException;
@@ -121,9 +121,9 @@ public class PipeConsensus implements IConsensus {
     this.consensusPipeGuardian =
         config.getPipeConsensusConfig().getPipe().getConsensusPipeGuardian();
     this.asyncClientManager =
-        
PipeConsensusClientMgrContainer.getInstance().getGlobalAsyncClientManager();
+        
IoTV2GlobalComponentContainer.getInstance().getGlobalAsyncClientManager();
     this.syncClientManager =
-        
PipeConsensusClientMgrContainer.getInstance().getGlobalSyncClientManager();
+        
IoTV2GlobalComponentContainer.getInstance().getGlobalSyncClientManager();
   }
 
   @Override
@@ -238,6 +238,7 @@ public class PipeConsensus implements IConsensus {
     registerManager.deregisterAll();
     consensusPipeGuardian.stop();
     
stateMachineMap.values().parallelStream().forEach(PipeConsensusServerImpl::stop);
+    IoTV2GlobalComponentContainer.getInstance().stopBackgroundTaskService();
   }
 
   private void checkAllConsensusPipe() {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/PipeConsensusAsyncConnector.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/PipeConsensusAsyncConnector.java
index 49a9241c1e3..0d3fc73bfc0 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/PipeConsensusAsyncConnector.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/PipeConsensusAsyncConnector.java
@@ -24,7 +24,7 @@ import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.commons.client.IClientManager;
 import org.apache.iotdb.commons.client.async.AsyncPipeConsensusServiceClient;
-import 
org.apache.iotdb.commons.client.container.PipeConsensusClientMgrContainer;
+import org.apache.iotdb.commons.client.container.IoTV2GlobalComponentContainer;
 import org.apache.iotdb.commons.consensus.ConsensusGroupId;
 import org.apache.iotdb.commons.consensus.index.ProgressIndex;
 import 
org.apache.iotdb.commons.exception.pipe.PipeRuntimeConnectorRetryTimesConfigurableException;
@@ -40,17 +40,19 @@ import 
org.apache.iotdb.consensus.pipe.thrift.TPipeConsensusTransferReq;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent;
+import 
org.apache.iotdb.db.pipe.connector.protocol.pipeconsensus.handler.PipeConsensusDeleteEventHandler;
 import 
org.apache.iotdb.db.pipe.connector.protocol.pipeconsensus.handler.PipeConsensusTabletBatchEventHandler;
 import 
org.apache.iotdb.db.pipe.connector.protocol.pipeconsensus.handler.PipeConsensusTabletInsertNodeEventHandler;
 import 
org.apache.iotdb.db.pipe.connector.protocol.pipeconsensus.handler.PipeConsensusTsFileInsertionEventHandler;
 import 
org.apache.iotdb.db.pipe.connector.protocol.pipeconsensus.payload.builder.PipeConsensusAsyncBatchReqBuilder;
+import 
org.apache.iotdb.db.pipe.connector.protocol.pipeconsensus.payload.request.PipeConsensusDeleteNodeReq;
 import 
org.apache.iotdb.db.pipe.connector.protocol.pipeconsensus.payload.request.PipeConsensusTabletBinaryReq;
 import 
org.apache.iotdb.db.pipe.connector.protocol.pipeconsensus.payload.request.PipeConsensusTabletInsertNodeReq;
 import org.apache.iotdb.db.pipe.consensus.metric.PipeConsensusConnectorMetrics;
+import org.apache.iotdb.db.pipe.event.common.PipeInsertionEvent;
 import org.apache.iotdb.db.pipe.event.common.deletion.PipeDeleteDataNodeEvent;
 import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent;
 import 
org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent;
-import 
org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
 import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
 import org.apache.iotdb.pipe.api.annotation.TableModel;
@@ -67,11 +69,14 @@ import org.slf4j.LoggerFactory;
 
 import java.io.FileNotFoundException;
 import java.io.IOException;
+import java.util.Comparator;
 import java.util.Iterator;
 import java.util.Objects;
+import java.util.PriorityQueue;
+import java.util.Queue;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingDeque;
-import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
@@ -95,10 +100,14 @@ public class PipeConsensusAsyncConnector extends 
IoTDBConnector implements Conse
   private static final IoTDBConfig IOTDB_CONFIG = 
IoTDBDescriptor.getInstance().getConfig();
   private static final long PIPE_CONSENSUS_EVENT_ENQUEUE_TIMEOUT_IN_MS =
       IOTDB_CONFIG.getConnectionTimeoutInMS() / 6;
-  private final BlockingQueue<Event> retryEventQueue = new 
LinkedBlockingQueue<>();
+  private final Queue<EnrichedEvent> retryEventQueue =
+      new PriorityQueue<>(
+          IOTDB_CONFIG.getIotConsensusV2PipelineSize(),
+          Comparator.comparingLong(EnrichedEvent::getReplicateIndexForIoTV2));
   // We use enrichedEvent here to make use of 
EnrichedEvent.equalsInPipeConsensus
   private final BlockingQueue<EnrichedEvent> transferBuffer =
       new LinkedBlockingDeque<>(IOTDB_CONFIG.getIotConsensusV2PipelineSize());
+  private ScheduledExecutorService backgroundTaskService;
   private final AtomicBoolean isClosed = new AtomicBoolean(false);
   private final int thisDataNodeId = 
IoTDBDescriptor.getInstance().getConfig().getDataNodeId();
   private PipeConsensusConnectorMetrics pipeConsensusConnectorMetrics;
@@ -147,7 +156,7 @@ public class PipeConsensusAsyncConnector extends 
IoTDBConnector implements Conse
             nodeUrls, consensusGroupId, thisDataNodeId, 
pipeConsensusConnectorMetrics);
     retryConnector.customize(parameters, configuration);
     asyncTransferClientManager =
-        
PipeConsensusClientMgrContainer.getInstance().getGlobalAsyncClientManager();
+        
IoTV2GlobalComponentContainer.getInstance().getGlobalAsyncClientManager();
 
     if (isTabletBatchModeEnabled) {
       tabletBatchBuilder =
@@ -159,6 +168,8 @@ public class PipeConsensusAsyncConnector extends 
IoTDBConnector implements Conse
 
     // currently, tablet batch is false by default in PipeConsensus;
     isTabletBatchModeEnabled = false;
+    this.backgroundTaskService =
+        IoTV2GlobalComponentContainer.getInstance().getBackgroundTaskService();
   }
 
   /**
@@ -248,7 +259,7 @@ public class PipeConsensusAsyncConnector extends 
IoTDBConnector implements Conse
 
   @Override
   public void transfer(TabletInsertionEvent tabletInsertionEvent) throws 
Exception {
-    syncTransferQueuedEventsIfNecessary();
+    asyncTransferQueuedEventsIfNecessary();
 
     boolean enqueueResult = addEvent2Buffer((EnrichedEvent) 
tabletInsertionEvent);
     if (!enqueueResult) {
@@ -267,45 +278,7 @@ public class PipeConsensusAsyncConnector extends 
IoTDBConnector implements Conse
         tabletBatchBuilder.onSuccess();
       }
     } else {
-      TCommitId tCommitId;
-      TConsensusGroupId tConsensusGroupId =
-          new TConsensusGroupId(TConsensusGroupType.DataRegion, 
consensusGroupId);
-      // tabletInsertionEvent instanceof PipeInsertNodeTabletInsertionEvent)
-      final PipeInsertNodeTabletInsertionEvent 
pipeInsertNodeTabletInsertionEvent =
-          (PipeInsertNodeTabletInsertionEvent) tabletInsertionEvent;
-      tCommitId =
-          new TCommitId(
-              pipeInsertNodeTabletInsertionEvent.getReplicateIndexForIoTV2(),
-              
pipeInsertNodeTabletInsertionEvent.getCommitterKey().getRestartTimes(),
-              pipeInsertNodeTabletInsertionEvent.getRebootTimes());
-
-      // We increase the reference count for this event to determine if the 
event may be released.
-      if (!pipeInsertNodeTabletInsertionEvent.increaseReferenceCount(
-          PipeConsensusAsyncConnector.class.getName())) {
-        return;
-      }
-
-      final InsertNode insertNode =
-          pipeInsertNodeTabletInsertionEvent.getInsertNodeViaCacheIfPossible();
-      final ProgressIndex progressIndex = 
pipeInsertNodeTabletInsertionEvent.getProgressIndex();
-      final TPipeConsensusTransferReq pipeConsensusTransferReq =
-          Objects.isNull(insertNode)
-              ? PipeConsensusTabletBinaryReq.toTPipeConsensusTransferReq(
-                  pipeInsertNodeTabletInsertionEvent.getByteBuffer(),
-                  tCommitId,
-                  tConsensusGroupId,
-                  progressIndex,
-                  thisDataNodeId)
-              : PipeConsensusTabletInsertNodeReq.toTPipeConsensusTransferReq(
-                  insertNode, tCommitId, tConsensusGroupId, progressIndex, 
thisDataNodeId);
-      final PipeConsensusTabletInsertNodeEventHandler 
pipeConsensusInsertNodeReqHandler =
-          new PipeConsensusTabletInsertNodeEventHandler(
-              pipeInsertNodeTabletInsertionEvent,
-              pipeConsensusTransferReq,
-              this,
-              pipeConsensusConnectorMetrics);
-
-      transfer(pipeConsensusInsertNodeReqHandler);
+      transferInEventWithoutCheck((PipeInsertionEvent) tabletInsertionEvent);
     }
   }
 
@@ -321,6 +294,50 @@ public class PipeConsensusAsyncConnector extends 
IoTDBConnector implements Conse
     }
   }
 
+  private boolean transferInEventWithoutCheck(PipeInsertionEvent 
tabletInsertionEvent)
+      throws Exception {
+    // tabletInsertionEvent instanceof PipeInsertNodeTabletInsertionEvent)
+    final PipeInsertNodeTabletInsertionEvent 
pipeInsertNodeTabletInsertionEvent =
+        (PipeInsertNodeTabletInsertionEvent) tabletInsertionEvent;
+    // We increase the reference count for this event to determine if the 
event may be released.
+    if (!pipeInsertNodeTabletInsertionEvent.increaseReferenceCount(
+        PipeConsensusAsyncConnector.class.getName())) {
+      return false;
+    }
+
+    TCommitId tCommitId;
+    TConsensusGroupId tConsensusGroupId =
+        new TConsensusGroupId(TConsensusGroupType.DataRegion, 
consensusGroupId);
+    tCommitId =
+        new TCommitId(
+            pipeInsertNodeTabletInsertionEvent.getReplicateIndexForIoTV2(),
+            
pipeInsertNodeTabletInsertionEvent.getCommitterKey().getRestartTimes(),
+            pipeInsertNodeTabletInsertionEvent.getRebootTimes());
+
+    final InsertNode insertNode =
+        pipeInsertNodeTabletInsertionEvent.getInsertNodeViaCacheIfPossible();
+    final ProgressIndex progressIndex = 
pipeInsertNodeTabletInsertionEvent.getProgressIndex();
+    final TPipeConsensusTransferReq pipeConsensusTransferReq =
+        Objects.isNull(insertNode)
+            ? PipeConsensusTabletBinaryReq.toTPipeConsensusTransferReq(
+                pipeInsertNodeTabletInsertionEvent.getByteBuffer(),
+                tCommitId,
+                tConsensusGroupId,
+                progressIndex,
+                thisDataNodeId)
+            : PipeConsensusTabletInsertNodeReq.toTPipeConsensusTransferReq(
+                insertNode, tCommitId, tConsensusGroupId, progressIndex, 
thisDataNodeId);
+    final PipeConsensusTabletInsertNodeEventHandler 
pipeConsensusInsertNodeReqHandler =
+        new PipeConsensusTabletInsertNodeEventHandler(
+            pipeInsertNodeTabletInsertionEvent,
+            pipeConsensusTransferReq,
+            this,
+            pipeConsensusConnectorMetrics);
+
+    transfer(pipeConsensusInsertNodeReqHandler);
+    return true;
+  }
+
   private void transfer(
       final PipeConsensusTabletInsertNodeEventHandler 
pipeConsensusInsertNodeReqHandler) {
     AsyncPipeConsensusServiceClient client = null;
@@ -335,7 +352,7 @@ public class PipeConsensusAsyncConnector extends 
IoTDBConnector implements Conse
 
   @Override
   public void transfer(TsFileInsertionEvent tsFileInsertionEvent) throws 
Exception {
-    syncTransferQueuedEventsIfNecessary();
+    asyncTransferQueuedEventsIfNecessary();
     transferBatchedEventsIfNecessary();
 
     if (!(tsFileInsertionEvent instanceof PipeTsFileInsertionEvent)) {
@@ -350,8 +367,19 @@ public class PipeConsensusAsyncConnector extends 
IoTDBConnector implements Conse
       throw new PipeRuntimeConnectorRetryTimesConfigurableException(
           ENQUEUE_EXCEPTION_MSG, Integer.MAX_VALUE);
     }
+
+    transferWithoutCheck(tsFileInsertionEvent);
+  }
+
+  private boolean transferWithoutCheck(TsFileInsertionEvent 
tsFileInsertionEvent) throws Exception {
     final PipeTsFileInsertionEvent pipeTsFileInsertionEvent =
         (PipeTsFileInsertionEvent) tsFileInsertionEvent;
+    // We increase the reference count for this event to determine if the 
event may be released.
+    if (!pipeTsFileInsertionEvent.increaseReferenceCount(
+        PipeConsensusAsyncConnector.class.getName())) {
+      return false;
+    }
+
     TCommitId tCommitId =
         new TCommitId(
             pipeTsFileInsertionEvent.getReplicateIndexForIoTV2(),
@@ -359,11 +387,6 @@ public class PipeConsensusAsyncConnector extends 
IoTDBConnector implements Conse
             pipeTsFileInsertionEvent.getRebootTimes());
     TConsensusGroupId tConsensusGroupId =
         new TConsensusGroupId(TConsensusGroupType.DataRegion, 
consensusGroupId);
-    // We increase the reference count for this event to determine if the 
event may be released.
-    if (!pipeTsFileInsertionEvent.increaseReferenceCount(
-        PipeConsensusAsyncConnector.class.getName())) {
-      return;
-    }
 
     try {
       // Just in case. To avoid the case that exception occurred when 
constructing the handler.
@@ -382,6 +405,7 @@ public class PipeConsensusAsyncConnector extends 
IoTDBConnector implements Conse
               pipeConsensusConnectorMetrics);
 
       transfer(pipeConsensusTsFileInsertionEventHandler);
+      return true;
     } catch (Exception e) {
       // Just in case. To avoid the case that exception occurred when 
constructing the handler.
       pipeTsFileInsertionEvent.decreaseReferenceCount(
@@ -408,7 +432,7 @@ public class PipeConsensusAsyncConnector extends 
IoTDBConnector implements Conse
    */
   @Override
   public void transfer(Event event) throws Exception {
-    syncTransferQueuedEventsIfNecessary();
+    asyncTransferQueuedEventsIfNecessary();
     transferBatchedEventsIfNecessary();
 
     // Transfer deletion
@@ -419,10 +443,8 @@ public class PipeConsensusAsyncConnector extends 
IoTDBConnector implements Conse
         throw new PipeRuntimeConnectorRetryTimesConfigurableException(
             ENQUEUE_EXCEPTION_MSG, Integer.MAX_VALUE);
       }
-      retryConnector.transfer(event);
-      // Since transfer method will throw an exception if transfer failed, 
removeEventFromBuffer
-      // will not be executed when transfer failed.
-      this.removeEventFromBuffer(deleteDataNodeEvent);
+
+      transferDeletion(deleteDataNodeEvent);
       return;
     }
 
@@ -432,6 +454,48 @@ public class PipeConsensusAsyncConnector extends 
IoTDBConnector implements Conse
     }
   }
 
+  private boolean transferDeletion(PipeDeleteDataNodeEvent 
pipeDeleteDataNodeEvent) {
+    // We increase the reference count for this event to determine if the 
event may be released.
+    if (!pipeDeleteDataNodeEvent.increaseReferenceCount(
+        PipeConsensusSyncConnector.class.getName())) {
+      return false;
+    }
+
+    final ProgressIndex progressIndex = 
pipeDeleteDataNodeEvent.getProgressIndex();
+    final TCommitId tCommitId =
+        new TCommitId(
+            pipeDeleteDataNodeEvent.getReplicateIndexForIoTV2(),
+            pipeDeleteDataNodeEvent.getCommitterKey().getRestartTimes(),
+            pipeDeleteDataNodeEvent.getRebootTimes());
+    final TConsensusGroupId tConsensusGroupId =
+        new TConsensusGroupId(TConsensusGroupType.DataRegion, 
consensusGroupId);
+
+    final TPipeConsensusTransferReq pipeConsensusTransferReq =
+        PipeConsensusDeleteNodeReq.toTPipeConsensusTransferReq(
+            pipeDeleteDataNodeEvent.getDeleteDataNode(),
+            tCommitId,
+            tConsensusGroupId,
+            progressIndex,
+            thisDataNodeId);
+    final PipeConsensusDeleteEventHandler pipeConsensusDeleteEventHandler =
+        new PipeConsensusDeleteEventHandler(
+            pipeDeleteDataNodeEvent, pipeConsensusTransferReq, this, 
pipeConsensusConnectorMetrics);
+
+    transfer(pipeConsensusDeleteEventHandler);
+    return true;
+  }
+
+  private void transfer(final PipeConsensusDeleteEventHandler 
pipeConsensusDeleteEventHandler) {
+    AsyncPipeConsensusServiceClient client = null;
+    try {
+      client = asyncTransferClientManager.borrowClient(getFollowerUrl());
+      pipeConsensusDeleteEventHandler.transfer(client);
+    } catch (final Exception ex) {
+      logOnClientException(client, ex);
+      pipeConsensusDeleteEventHandler.onError(ex);
+    }
+  }
+
   /** Try its best to commit data in order. Flush can also be a trigger to 
transfer batched data. */
   private void transferBatchedEventsIfNecessary() throws IOException {
     if (!isTabletBatchModeEnabled || tabletBatchBuilder.isEmpty()) {
@@ -444,88 +508,136 @@ public class PipeConsensusAsyncConnector extends 
IoTDBConnector implements Conse
     tabletBatchBuilder.onSuccess();
   }
 
-  /**
-   * Transfer queued {@link Event}s which are waiting for retry.
-   *
-   * @throws Exception if an error occurs. The error will be handled by pipe 
framework, which will
-   *     retry the {@link Event} and mark the {@link Event} as failure and 
stop the pipe if the
-   *     retry times exceeds the threshold.
-   */
-  private void syncTransferQueuedEventsIfNecessary() throws Exception {
+  /** Transfer queued {@link Event}s which are waiting for retry. */
+  private void asyncTransferQueuedEventsIfNecessary() {
+    long retryStartTime = System.currentTimeMillis();
     while (!retryEventQueue.isEmpty()) {
       synchronized (this) {
         if (isClosed.get() || retryEventQueue.isEmpty()) {
           return;
         }
-
-        final Event peekedEvent = retryEventQueue.peek();
-        // do transfer
-        if (peekedEvent instanceof PipeInsertNodeTabletInsertionEvent) {
-          retryConnector.transfer((PipeInsertNodeTabletInsertionEvent) 
peekedEvent);
-        } else if (peekedEvent instanceof PipeRawTabletInsertionEvent) {
-          retryConnector.transfer((PipeRawTabletInsertionEvent) peekedEvent);
-        } else if (peekedEvent instanceof PipeTsFileInsertionEvent) {
-          retryConnector.transfer((PipeTsFileInsertionEvent) peekedEvent);
-        } else {
-          if (LOGGER.isWarnEnabled()) {
-            LOGGER.warn(
-                "PipeConsensusAsyncConnector does not support transfer generic 
event: {}.",
-                peekedEvent);
-          }
-        }
-        // release resource
-        if (peekedEvent instanceof EnrichedEvent) {
-          ((EnrichedEvent) peekedEvent)
-              
.decreaseReferenceCount(PipeConsensusAsyncConnector.class.getName(), true);
+        if (System.currentTimeMillis() - retryStartTime > 
TimeUnit.SECONDS.toMillis(20)) {
+          // just in case that some events are polled and re-added into queue 
again and again,
+          // causing this loop to run forever.
+          LOGGER.warn(
+              "PipeConsensus-ConsensusGroup-{}: retryEventQueue is not empty 
after 20 seconds. retryQueue size: {}",
+              consensusGroupId,
+              retryEventQueue.size());
+          return;
         }
 
-        final Event polledEvent = retryEventQueue.poll();
-        if (polledEvent != peekedEvent) {
-          if (LOGGER.isErrorEnabled()) {
-            LOGGER.error(
-                "The event polled from the queue is not the same as the event 
peeked from the queue. "
-                    + "Peeked event: {}, polled event: {}.",
-                peekedEvent,
-                polledEvent);
-          }
-        }
-        if (polledEvent != null) {
-          if (LOGGER.isDebugEnabled()) {
-            LOGGER.debug("Polled event {} from retry queue.", polledEvent);
-          }
-          // poll it from transferBuffer
-          removeEventFromBuffer((EnrichedEvent) polledEvent);
-        }
+        // remove this event from queue. If retry fail as well, event will be 
re-added into
+        // retryQueue.
+        final EnrichedEvent peekedEvent = retryEventQueue.poll();
+        // retry with interval when necessarily
+        long retryInterval =
+            peekedEvent.getRetryInterval() > 
EnrichedEvent.INITIAL_RETRY_INTERVAL_FOR_IOTV2
+                ? peekedEvent.getRetryInterval()
+                : 0L;
+        LOGGER.info(
+            "PipeConsensus-ConsensusGroup-{}: retry with interval {} for {}",
+            consensusGroupId,
+            retryInterval,
+            peekedEvent);
+        // need to retry in background service, otherwise the retryInterval 
will block the sender
+        // procedure.
+        backgroundTaskService.schedule(
+            () -> {
+              // do transfer
+              if (peekedEvent instanceof PipeInsertNodeTabletInsertionEvent) {
+                retryTransfer((PipeInsertNodeTabletInsertionEvent) 
peekedEvent);
+              } else if (peekedEvent instanceof PipeTsFileInsertionEvent) {
+                retryTransfer((PipeTsFileInsertionEvent) peekedEvent);
+              } else if (peekedEvent instanceof PipeDeleteDataNodeEvent) {
+                retryTransfer((PipeDeleteDataNodeEvent) peekedEvent);
+              } else {
+                if (LOGGER.isWarnEnabled()) {
+                  LOGGER.warn(
+                      "PipeConsensusAsyncConnector does not support transfer 
generic event: {}.",
+                      peekedEvent);
+                }
+              }
+            },
+            retryInterval,
+            TimeUnit.MILLISECONDS);
       }
     }
   }
 
+  private void retryTransfer(final PipeInsertionEvent tabletInsertionEvent) {
+    // TODO: batch transfer
+    try {
+      if (transferInEventWithoutCheck(tabletInsertionEvent)) {
+        tabletInsertionEvent.decreaseReferenceCount(
+            PipeConsensusAsyncConnector.class.getName(), false);
+      } else {
+        addFailureEventToRetryQueue(tabletInsertionEvent);
+      }
+    } catch (final Exception e) {
+      tabletInsertionEvent.decreaseReferenceCount(
+          PipeConsensusAsyncConnector.class.getName(), false);
+      addFailureEventToRetryQueue(tabletInsertionEvent);
+    }
+  }
+
+  private void retryTransfer(final PipeTsFileInsertionEvent 
tsFileInsertionEvent) {
+    try {
+      if (transferWithoutCheck(tsFileInsertionEvent)) {
+        tsFileInsertionEvent.decreaseReferenceCount(
+            PipeConsensusAsyncConnector.class.getName(), false);
+      } else {
+        addFailureEventToRetryQueue(tsFileInsertionEvent);
+      }
+    } catch (final Exception e) {
+      tsFileInsertionEvent.decreaseReferenceCount(
+          PipeConsensusAsyncConnector.class.getName(), false);
+      addFailureEventToRetryQueue(tsFileInsertionEvent);
+    }
+  }
+
+  private void retryTransfer(final PipeDeleteDataNodeEvent 
deleteDataNodeEvent) {
+    try {
+      if (transferDeletion(deleteDataNodeEvent)) {
+        deleteDataNodeEvent.decreaseReferenceCount(
+            PipeConsensusAsyncConnector.class.getName(), false);
+      } else {
+        addFailureEventToRetryQueue(deleteDataNodeEvent);
+      }
+    } catch (final Exception e) {
+      deleteDataNodeEvent.decreaseReferenceCount(
+          PipeConsensusAsyncConnector.class.getName(), false);
+      addFailureEventToRetryQueue(deleteDataNodeEvent);
+    }
+  }
+
   /**
    * Add failure event to retry queue.
    *
    * @param event event to retry
    */
   @SuppressWarnings("java:S899")
-  public void addFailureEventToRetryQueue(final Event event) {
+  public void addFailureEventToRetryQueue(final EnrichedEvent event) {
+    if (event.isReleased()) {
+      return;
+    }
+
     if (isClosed.get()) {
-      if (event instanceof EnrichedEvent) {
-        ((EnrichedEvent) 
event).clearReferenceCount(PipeConsensusAsyncConnector.class.getName());
-      }
+      event.clearReferenceCount(PipeConsensusAsyncConnector.class.getName());
+      return;
+    }
+    // just in case
+    if (retryEventQueue.contains(event)) {
       return;
     }
 
     retryEventQueue.offer(event);
-    if (LOGGER.isDebugEnabled()) {
-      LOGGER.debug(
-          "PipeConsensus-ConsensusGroup-{}: Event {} transfer failed, will be 
added to retry queue.",
-          consensusGroupId,
-          event);
-    }
+    LOGGER.info(
+        "PipeConsensus-ConsensusGroup-{}: Event {} transfer failed, will be 
added to retry queue.",
+        consensusGroupId,
+        event);
 
     if (isClosed.get()) {
-      if (event instanceof EnrichedEvent) {
-        ((EnrichedEvent) 
event).clearReferenceCount(PipeConsensusAsyncConnector.class.getName());
-      }
+      event.clearReferenceCount(PipeConsensusAsyncConnector.class.getName());
     }
   }
 
@@ -534,18 +646,16 @@ public class PipeConsensusAsyncConnector extends 
IoTDBConnector implements Conse
    *
    * @param events events to retry
    */
-  public void addFailureEventsToRetryQueue(final Iterable<Event> events) {
-    for (final Event event : events) {
+  public void addFailureEventsToRetryQueue(final Iterable<EnrichedEvent> 
events) {
+    for (final EnrichedEvent event : events) {
       addFailureEventToRetryQueue(event);
     }
   }
 
   public synchronized void clearRetryEventsReferenceCount() {
     while (!retryEventQueue.isEmpty()) {
-      final Event event = retryEventQueue.poll();
-      if (event instanceof EnrichedEvent) {
-        ((EnrichedEvent) 
event).clearReferenceCount(PipeConsensusAsyncConnector.class.getName());
-      }
+      final EnrichedEvent event = retryEventQueue.poll();
+      event.clearReferenceCount(PipeConsensusAsyncConnector.class.getName());
     }
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/PipeConsensusSyncConnector.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/PipeConsensusSyncConnector.java
index d5ba9d04120..02a632ec84d 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/PipeConsensusSyncConnector.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/PipeConsensusSyncConnector.java
@@ -24,7 +24,7 @@ import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.client.IClientManager;
-import 
org.apache.iotdb.commons.client.container.PipeConsensusClientMgrContainer;
+import org.apache.iotdb.commons.client.container.IoTV2GlobalComponentContainer;
 import org.apache.iotdb.commons.client.sync.SyncPipeConsensusServiceClient;
 import org.apache.iotdb.commons.consensus.index.ProgressIndex;
 import 
org.apache.iotdb.commons.exception.pipe.PipeRuntimeConnectorRetryTimesConfigurableException;
@@ -97,7 +97,7 @@ public class PipeConsensusSyncConnector extends 
IoTDBConnector {
     this.consensusGroupId = consensusGroupId;
     this.thisDataNodeId = thisDataNodeId;
     this.syncRetryClientManager =
-        
PipeConsensusClientMgrContainer.getInstance().getGlobalSyncClientManager();
+        
IoTV2GlobalComponentContainer.getInstance().getGlobalSyncClientManager();
     this.pipeConsensusConnectorMetrics = pipeConsensusConnectorMetrics;
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/handler/PipeConsensusTabletInsertionEventHandler.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/handler/PipeConsensusDeleteEventHandler.java
similarity index 63%
copy from 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/handler/PipeConsensusTabletInsertionEventHandler.java
copy to 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/handler/PipeConsensusDeleteEventHandler.java
index 609e7e5e509..aa0734c883e 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/handler/PipeConsensusTabletInsertionEventHandler.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/handler/PipeConsensusDeleteEventHandler.java
@@ -21,13 +21,12 @@ package 
org.apache.iotdb.db.pipe.connector.protocol.pipeconsensus.handler;
 
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.client.async.AsyncPipeConsensusServiceClient;
-import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
+import org.apache.iotdb.commons.utils.RetryUtils;
 import org.apache.iotdb.consensus.pipe.thrift.TPipeConsensusTransferReq;
 import org.apache.iotdb.consensus.pipe.thrift.TPipeConsensusTransferResp;
 import 
org.apache.iotdb.db.pipe.connector.protocol.pipeconsensus.PipeConsensusAsyncConnector;
-import 
org.apache.iotdb.db.pipe.connector.protocol.thrift.async.handler.PipeTransferTabletInsertionEventHandler;
 import org.apache.iotdb.db.pipe.consensus.metric.PipeConsensusConnectorMetrics;
-import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
+import org.apache.iotdb.db.pipe.event.common.deletion.PipeDeleteDataNodeEvent;
 import org.apache.iotdb.pipe.api.exception.PipeException;
 import org.apache.iotdb.rpc.TSStatusCode;
 
@@ -36,23 +35,23 @@ import org.apache.thrift.async.AsyncMethodCallback;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public abstract class PipeConsensusTabletInsertionEventHandler<E extends 
TPipeConsensusTransferResp>
-    implements AsyncMethodCallback<E> {
+public class PipeConsensusDeleteEventHandler
+    implements AsyncMethodCallback<TPipeConsensusTransferResp> {
   private static final Logger LOGGER =
-      LoggerFactory.getLogger(PipeConsensusTabletInsertionEventHandler.class);
+      LoggerFactory.getLogger(PipeConsensusDeleteEventHandler.class);
 
-  protected final TabletInsertionEvent event;
+  private final PipeDeleteDataNodeEvent event;
 
-  protected final TPipeConsensusTransferReq req;
+  private final TPipeConsensusTransferReq req;
 
-  protected final PipeConsensusAsyncConnector connector;
+  private final PipeConsensusAsyncConnector connector;
 
-  protected final PipeConsensusConnectorMetrics metric;
+  private final PipeConsensusConnectorMetrics metric;
 
   private final long createTime;
 
-  protected PipeConsensusTabletInsertionEventHandler(
-      TabletInsertionEvent event,
+  public PipeConsensusDeleteEventHandler(
+      PipeDeleteDataNodeEvent event,
       TPipeConsensusTransferReq req,
       PipeConsensusAsyncConnector connector,
       PipeConsensusConnectorMetrics metric) {
@@ -64,12 +63,9 @@ public abstract class 
PipeConsensusTabletInsertionEventHandler<E extends TPipeCo
   }
 
   public void transfer(AsyncPipeConsensusServiceClient client) throws 
TException {
-    doTransfer(client, req);
+    client.pipeConsensusTransfer(req, this);
   }
 
-  protected abstract void doTransfer(
-      AsyncPipeConsensusServiceClient client, TPipeConsensusTransferReq req) 
throws TException;
-
   @Override
   public void onComplete(TPipeConsensusTransferResp response) {
     // Just in case
@@ -85,16 +81,13 @@ public abstract class 
PipeConsensusTabletInsertionEventHandler<E extends TPipeCo
           && status.getCode() != 
TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) {
         connector.statusHandler().handle(status, status.getMessage(), 
event.toString());
       }
-      if (event instanceof EnrichedEvent) {
-        ((EnrichedEvent) event)
-            
.decreaseReferenceCount(PipeTransferTabletInsertionEventHandler.class.getName(),
 true);
-      }
+      
event.decreaseReferenceCount(PipeConsensusDeleteEventHandler.class.getName(), 
true);
 
       if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
         LOGGER.info(
-            "InsertNodeTransfer: no.{} event successfully processed!",
-            ((EnrichedEvent) event).getReplicateIndexForIoTV2());
-        connector.removeEventFromBuffer((EnrichedEvent) event);
+            "DeleteNodeTransfer: no.{} event successfully processed!",
+            event.getReplicateIndexForIoTV2());
+        connector.removeEventFromBuffer(event);
       }
 
       long duration = System.nanoTime() - createTime;
@@ -105,16 +98,23 @@ public abstract class 
PipeConsensusTabletInsertionEventHandler<E extends TPipeCo
   }
 
   @Override
-  public void onError(Exception exception) {
+  public void onError(Exception e) {
     LOGGER.warn(
-        "Failed to transfer TabletInsertionEvent {} (committer key={}, 
replicate index={}).",
-        event instanceof EnrichedEvent
-            ? ((EnrichedEvent) event).coreReportMessage()
-            : event.toString(),
-        event instanceof EnrichedEvent ? ((EnrichedEvent) 
event).getCommitterKey() : null,
-        event instanceof EnrichedEvent ? ((EnrichedEvent) 
event).getReplicateIndexForIoTV2() : null,
-        exception);
-
+        "Failed to transfer PipeDeleteNodeEvent {} (committer key={}, 
replicate index={}).",
+        event.coreReportMessage(),
+        event.getCommitterKey(),
+        event.getReplicateIndexForIoTV2(),
+        e);
+
+    if (RetryUtils.needRetryWithIncreasingInterval(e)) {
+      // just in case for overflow
+      if (event.getRetryInterval() << 2 <= 0) {
+        event.setRetryInterval(1000L * 20);
+      } else {
+        event.setRetryInterval(Math.min(1000L * 20, event.getRetryInterval() 
<< 2));
+      }
+    }
+    // IoTV2 ensures that only use PipeInsertionEvent, which is definitely 
EnrichedEvent.
     connector.addFailureEventToRetryQueue(event);
     metric.recordRetryCounter();
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/handler/PipeConsensusTabletBatchEventHandler.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/handler/PipeConsensusTabletBatchEventHandler.java
index a2c6406b4a1..31677862579 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/handler/PipeConsensusTabletBatchEventHandler.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/handler/PipeConsensusTabletBatchEventHandler.java
@@ -46,7 +46,7 @@ public class PipeConsensusTabletBatchEventHandler
   private static final Logger LOGGER =
       LoggerFactory.getLogger(PipeConsensusTabletBatchEventHandler.class);
   private final List<Long> requestCommitIds;
-  private final List<Event> events;
+  private final List<EnrichedEvent> events;
   private final TPipeConsensusBatchTransferReq req;
   private final PipeConsensusAsyncConnector connector;
   private final PipeConsensusConnectorMetrics pipeConsensusConnectorMetrics;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/handler/PipeConsensusTabletInsertionEventHandler.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/handler/PipeConsensusTabletInsertionEventHandler.java
index 609e7e5e509..482fea76140 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/handler/PipeConsensusTabletInsertionEventHandler.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/handler/PipeConsensusTabletInsertionEventHandler.java
@@ -22,6 +22,7 @@ package 
org.apache.iotdb.db.pipe.connector.protocol.pipeconsensus.handler;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.client.async.AsyncPipeConsensusServiceClient;
 import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
+import org.apache.iotdb.commons.utils.RetryUtils;
 import org.apache.iotdb.consensus.pipe.thrift.TPipeConsensusTransferReq;
 import org.apache.iotdb.consensus.pipe.thrift.TPipeConsensusTransferResp;
 import 
org.apache.iotdb.db.pipe.connector.protocol.pipeconsensus.PipeConsensusAsyncConnector;
@@ -106,16 +107,24 @@ public abstract class 
PipeConsensusTabletInsertionEventHandler<E extends TPipeCo
 
   @Override
   public void onError(Exception exception) {
+    EnrichedEvent event = (EnrichedEvent) this.event;
     LOGGER.warn(
         "Failed to transfer TabletInsertionEvent {} (committer key={}, 
replicate index={}).",
-        event instanceof EnrichedEvent
-            ? ((EnrichedEvent) event).coreReportMessage()
-            : event.toString(),
-        event instanceof EnrichedEvent ? ((EnrichedEvent) 
event).getCommitterKey() : null,
-        event instanceof EnrichedEvent ? ((EnrichedEvent) 
event).getReplicateIndexForIoTV2() : null,
+        event.coreReportMessage(),
+        event.getCommitterKey(),
+        event.getReplicateIndexForIoTV2(),
         exception);
 
-    connector.addFailureEventToRetryQueue(event);
+    if (RetryUtils.needRetryWithIncreasingInterval(exception)) {
+      // just in case for overflow
+      if (event.getRetryInterval() << 2 <= 0) {
+        event.setRetryInterval(1000L * 20);
+      } else {
+        event.setRetryInterval(Math.min(1000L * 20, event.getRetryInterval() 
<< 2));
+      }
+    }
+    // IoTV2 ensures that only use PipeInsertionEvent, which is definitely 
EnrichedEvent.
+    connector.addFailureEventToRetryQueue((EnrichedEvent) event);
     metric.recordRetryCounter();
   }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/handler/PipeConsensusTsFileInsertionEventHandler.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/handler/PipeConsensusTsFileInsertionEventHandler.java
index e3e2bf0ba05..c7ee3e2c0bd 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/handler/PipeConsensusTsFileInsertionEventHandler.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/handler/PipeConsensusTsFileInsertionEventHandler.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.client.async.AsyncPipeConsensusServiceClient;
 import org.apache.iotdb.commons.pipe.config.PipeConfig;
 import 
org.apache.iotdb.commons.pipe.connector.payload.pipeconsensus.response.PipeConsensusTransferFilePieceResp;
+import org.apache.iotdb.commons.utils.RetryUtils;
 import org.apache.iotdb.consensus.pipe.thrift.TCommitId;
 import org.apache.iotdb.consensus.pipe.thrift.TPipeConsensusTransferResp;
 import 
org.apache.iotdb.db.pipe.connector.protocol.pipeconsensus.PipeConsensusAsyncConnector;
@@ -292,6 +293,15 @@ public class PipeConsensusTsFileInsertionEventHandler
         event.getReplicateIndexForIoTV2(),
         exception);
 
+    if (RetryUtils.needRetryWithIncreasingInterval(exception)) {
+      // just in case for overflow
+      if (event.getRetryInterval() << 2 <= 0) {
+        event.setRetryInterval(1000L * 20);
+      } else {
+        event.setRetryInterval(Math.min(1000L * 20, event.getRetryInterval() 
<< 2));
+      }
+    }
+
     try {
       if (reader != null) {
         reader.close();
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/payload/builder/PipeConsensusTransferBatchReqBuilder.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/payload/builder/PipeConsensusTransferBatchReqBuilder.java
index cf004b65f4d..978646fe8fb 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/payload/builder/PipeConsensusTransferBatchReqBuilder.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/payload/builder/PipeConsensusTransferBatchReqBuilder.java
@@ -60,7 +60,7 @@ public abstract class PipeConsensusTransferBatchReqBuilder 
implements AutoClosea
   private static final Logger LOGGER =
       LoggerFactory.getLogger(PipeConsensusTransferBatchReqBuilder.class);
 
-  protected final List<Event> events = new ArrayList<>();
+  protected final List<EnrichedEvent> events = new ArrayList<>();
   protected final List<Long> requestCommitIds = new ArrayList<>();
   protected final List<TPipeConsensusTransferReq> batchReqs = new 
ArrayList<>();
   // limit in delayed time
@@ -138,7 +138,7 @@ public abstract class PipeConsensusTransferBatchReqBuilder 
implements AutoClosea
     // The deduplication logic here is to avoid the accumulation of the same 
event in a batch when
     // retrying.
     if ((events.isEmpty() || !events.get(events.size() - 1).equals(event))) {
-      events.add(event);
+      events.add((EnrichedEvent) event);
       requestCommitIds.add(requestCommitId);
       final int bufferSize = buildTabletInsertionBuffer(event);
 
@@ -179,7 +179,7 @@ public abstract class PipeConsensusTransferBatchReqBuilder 
implements AutoClosea
     return batchReqs.isEmpty();
   }
 
-  public List<Event> deepCopyEvents() {
+  public List<EnrichedEvent> deepCopyEvents() {
     return new ArrayList<>(events);
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiver.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiver.java
index d76b956b4d3..acadcfc2d53 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiver.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiver.java
@@ -101,7 +101,7 @@ public class PipeConsensusReceiver {
   private static final IoTDBConfig IOTDB_CONFIG = 
IoTDBDescriptor.getInstance().getConfig();
   private static final long PIPE_CONSENSUS_RECEIVER_MAX_WAITING_TIME_IN_MS =
       (long) IOTDB_CONFIG.getConnectionTimeoutInMS()
-          / 6
+          / 3
           * IOTDB_CONFIG.getIotConsensusV2PipelineSize();
   private static final long CLOSE_TSFILE_WRITER_MAX_WAIT_TIME_IN_MS = 5000;
   private static final long RETRY_WAIT_TIME = 500;
@@ -1397,8 +1397,11 @@ public class PipeConsensusReceiver {
    * although events can arrive receiver in a random sequence.
    */
   private class RequestExecutor {
-    private static final String THIS_NODE = "this node";
-    private static final String PIPE_TASK = "pipe task";
+    private static final String MSG_NODE_RESTART_INDEX_STALE =
+        "sender dn restarts before this event was sent here";
+    private static final String MSG_PIPE_RESTART_INDEX_STALE =
+        "pipe task restarts before this event was sent here";
+    private static final String MSG_STALE_REPLICATE_INDEX = "replicate index 
is out dated";
 
     // An ordered set that buffers transfer requests' TCommitId, whose length 
is not larger than
     // PIPE_CONSENSUS_PIPELINE_SIZE.
@@ -1419,6 +1422,7 @@ public class PipeConsensusReceiver {
       this.reqExecutionOrderBuffer =
           new TreeSet<>(
               Comparator.comparingInt(RequestMeta::getDataNodeRebootTimes)
+                  .thenComparingInt(RequestMeta::getPipeTaskRestartTimes)
                   .thenComparingLong(RequestMeta::getReplicateIndex));
       this.lock = new ReentrantLock();
       this.condition = lock.newCondition();
@@ -1426,6 +1430,34 @@ public class PipeConsensusReceiver {
       this.tsFileWriterPool = tsFileWriterPool;
     }
 
+    private TPipeConsensusTransferResp preCheck(TCommitId tCommitId) {
+      // if a req is deprecated, we will discard it
+      // This case may happen in this scenario: leader has transferred {1,2} 
and is intending to
+      // transfer {3, 4, 5, 6}. And in one moment, follower has received {4, 
5, 6}, {3} is still
+      // transferring due to some network latency.
+      // At this time, leader restarts, and it will resend {3, 4, 5, 6} with 
incremental
+      // rebootTimes. If the {3} sent before the leader restart arrives after 
the follower
+      // receives
+      // the request with incremental rebootTimes, the {3} sent before the 
leader restart needs to
+      // be discarded.
+      if (tCommitId.getDataNodeRebootTimes() < connectorRebootTimes) {
+        return deprecatedResp(MSG_NODE_RESTART_INDEX_STALE, tCommitId);
+      }
+      // Similarly, check pipeTask restartTimes
+      if (tCommitId.getDataNodeRebootTimes() == connectorRebootTimes
+          && tCommitId.getPipeTaskRestartTimes() < pipeTaskRestartTimes) {
+        return deprecatedResp(MSG_PIPE_RESTART_INDEX_STALE, tCommitId);
+      }
+      // Similarly, check replicationIndex
+      if (tCommitId.getDataNodeRebootTimes() == connectorRebootTimes
+          && tCommitId.getPipeTaskRestartTimes() == pipeTaskRestartTimes
+          && tCommitId.getReplicateIndex() < onSyncedReplicateIndex + 1) {
+        return deprecatedResp(MSG_STALE_REPLICATE_INDEX, tCommitId);
+      }
+      // pass check
+      return null;
+    }
+
     private TPipeConsensusTransferResp onRequest(
         final TPipeConsensusTransferReq req,
         final boolean isTransferTsFilePiece,
@@ -1442,27 +1474,15 @@ public class PipeConsensusReceiver {
 
         TCommitId tCommitId = req.getCommitId();
         RequestMeta requestMeta = new RequestMeta(tCommitId);
+        TPipeConsensusTransferResp preCheckRes = preCheck(tCommitId);
+        if (preCheckRes != null) {
+          return preCheckRes;
+        }
+
         LOGGER.info(
             "PipeConsensus-PipeName-{}: start to receive no.{} event",
             consensusPipeName,
             tCommitId);
-        // if a req is deprecated, we will discard it
-        // This case may happen in this scenario: leader has transferred {1,2} 
and is intending to
-        // transfer {3, 4, 5, 6}. And in one moment, follower has received {4, 
5, 6}, {3} is still
-        // transferring due to some network latency.
-        // At this time, leader restarts, and it will resend {3, 4, 5, 6} with 
incremental
-        // rebootTimes. If the {3} sent before the leader restart arrives 
after the follower
-        // receives
-        // the request with incremental rebootTimes, the {3} sent before the 
leader restart needs to
-        // be discarded.
-        if (tCommitId.getDataNodeRebootTimes() < connectorRebootTimes) {
-          return deprecatedResp(THIS_NODE);
-        }
-        // Similarly, check pipeTask restartTimes
-        if (tCommitId.getDataNodeRebootTimes() == connectorRebootTimes
-            && tCommitId.getPipeTaskRestartTimes() < pipeTaskRestartTimes) {
-          return deprecatedResp(PIPE_TASK);
-        }
         // Judge whether connector has rebooted or not, if the rebootTimes 
increases compared to
         // connectorRebootTimes, need to reset receiver because connector has 
been restarted.
         if (tCommitId.getDataNodeRebootTimes() > connectorRebootTimes) {
@@ -1560,31 +1580,52 @@ public class PipeConsensusReceiver {
               // pipeTaskStartTimes or rebootTimes came in and refreshed the 
requestBuffer. In that
               // cases we need to discard these requests.
               if (!reqExecutionOrderBuffer.contains(requestMeta)) {
-                return deprecatedResp(String.format("%s or %s", THIS_NODE, 
PIPE_TASK));
+                return deprecatedResp(
+                    String.format(
+                        "%s or %s", MSG_NODE_RESTART_INDEX_STALE, 
MSG_PIPE_RESTART_INDEX_STALE),
+                    tCommitId);
               }
-              // If the buffer is not full after waiting timeout, we suppose 
that the sender will
-              // not send any more events at this time, that is, the sender 
has sent all events. At
-              // this point we apply the event at reqBuffer's peek
-              if (timeout
-                  && reqExecutionOrderBuffer.size() < 
IOTDB_CONFIG.getIotConsensusV2PipelineSize()
-                  && reqExecutionOrderBuffer.first() != null
-                  && reqExecutionOrderBuffer.first().equals(requestMeta)) {
-                // TODO: Turn it to debug after GA
-                LOGGER.info(
-                    "PipeConsensus-PipeName-{}: no.{} event get executed after 
awaiting timeout, current receiver syncIndex: {}",
-                    consensusPipeName,
-                    tCommitId,
-                    onSyncedReplicateIndex);
-                long startApplyNanos = System.nanoTime();
-                metric.recordDispatchWaitingTimer(startApplyNanos - 
startDispatchNanos);
-                requestMeta.setStartApplyNanos(startApplyNanos);
-                TPipeConsensusTransferResp resp = loadEvent(req);
-
-                if (resp != null
-                    && resp.getStatus().getCode() == 
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
-                  onSuccess(tCommitId, isTransferTsFileSeal);
+              // After waiting timeout, we suppose that the sender will not 
send any more events at
+              // this time, that is, the sender has sent all events. At this 
point we apply the
+              // event at reqBuffer's peek
+              if (timeout && reqExecutionOrderBuffer.first() != null) {
+                // if current event is the first event in reqBuffer, we can 
process it.
+                if (reqExecutionOrderBuffer.first().equals(requestMeta)) {
+                  // TODO: Turn it to debug after GA
+                  LOGGER.info(
+                      "PipeConsensus-PipeName-{}: no.{} event get executed 
after awaiting timeout, current receiver syncIndex: {}",
+                      consensusPipeName,
+                      tCommitId,
+                      onSyncedReplicateIndex);
+                  long startApplyNanos = System.nanoTime();
+                  metric.recordDispatchWaitingTimer(startApplyNanos - 
startDispatchNanos);
+                  requestMeta.setStartApplyNanos(startApplyNanos);
+                  TPipeConsensusTransferResp resp = loadEvent(req);
+
+                  if (resp != null
+                      && resp.getStatus().getCode()
+                          == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+                    onSuccess(tCommitId, isTransferTsFileSeal);
+                  }
+                  return resp;
+                }
+                // if current event is not the first event in reqBuffer, we 
should return an error
+                // code to let leader retry or proceed instead of getting 
stuck in this while loop
+                // and block sender.
+                else {
+                  final TSStatus status =
+                      new TSStatus(
+                          RpcUtils.getStatus(
+                              TSStatusCode.PIPE_CONSENSUS_WAIT_ORDER_TIMEOUT,
+                              "Waiting for the previous event times out, 
returns an error to let the sender retry and continue scheduling."));
+                  // TODO: Turn it to debug after GA
+                  LOGGER.info(
+                      "PipeConsensus-{}: Waiting for the previous event times 
out, current peek {}, current id {}",
+                      consensusPipeName,
+                      reqExecutionOrderBuffer.first().commitId,
+                      tCommitId);
+                  return new TPipeConsensusTransferResp(status);
                 }
-                return resp;
               }
             } catch (InterruptedException e) {
               Thread.currentThread().interrupt();
@@ -1685,17 +1726,18 @@ public class PipeConsensusReceiver {
       }
     }
 
-    private TPipeConsensusTransferResp deprecatedResp(String msg) {
+    private TPipeConsensusTransferResp deprecatedResp(String msg, TCommitId 
tCommitId) {
       final TSStatus status =
           new TSStatus(
               RpcUtils.getStatus(
                   TSStatusCode.PIPE_CONSENSUS_DEPRECATED_REQUEST,
                   String.format(
-                      "PipeConsensus receiver received a deprecated request, 
which may be sent before %s restarts. Consider to discard it",
+                      "PipeConsensus receiver received a deprecated request, 
which may because %s. Consider to discard it.",
                       msg)));
       LOGGER.info(
-          "PipeConsensus-PipeName-{}: received a deprecated request, which may 
be sent before {} restarts. Consider to discard it",
+          "PipeConsensus-PipeName-{}: received a deprecated request-{}, which 
may because {}. ",
           consensusPipeName,
+          tCommitId,
           msg);
       return new TPipeConsensusTransferResp(status);
     }
@@ -1713,6 +1755,10 @@ public class PipeConsensusReceiver {
       return commitId.getDataNodeRebootTimes();
     }
 
+    public int getPipeTaskRestartTimes() {
+      return commitId.getPipeTaskRestartTimes();
+    }
+
     public long getReplicateIndex() {
       return commitId.getReplicateIndex();
     }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/container/PipeConsensusClientMgrContainer.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/container/IoTV2GlobalComponentContainer.java
similarity index 67%
rename from 
iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/container/PipeConsensusClientMgrContainer.java
rename to 
iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/container/IoTV2GlobalComponentContainer.java
index 6ba0136280d..f6f66b9c9ef 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/container/PipeConsensusClientMgrContainer.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/container/IoTV2GlobalComponentContainer.java
@@ -26,24 +26,34 @@ import org.apache.iotdb.commons.client.IClientManager;
 import org.apache.iotdb.commons.client.async.AsyncPipeConsensusServiceClient;
 import org.apache.iotdb.commons.client.property.PipeConsensusClientProperty;
 import org.apache.iotdb.commons.client.sync.SyncPipeConsensusServiceClient;
+import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.commons.concurrent.ThreadName;
 import org.apache.iotdb.commons.conf.CommonConfig;
 import org.apache.iotdb.commons.conf.CommonDescriptor;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
 /**
- * This class is used to hold the syncClientManager and asyncClientManager 
used by pipeConsensus.
- * The purpose of designing this class is that both the consensus layer and 
the datanode layer of
- * pipeConsensus use clientManager.
+ * This class is used to hold the global component such as syncClientManager 
and asyncClientManager
+ * used by pipeConsensus. The purpose of designing this class is that both the 
consensus layer and
+ * the datanode layer of pipeConsensus use clientManager.
  *
  * <p>Note: we hope to create the corresponding clientManager only when the 
consensus is
  * pipeConsensus to avoid unnecessary overhead.
  */
-public class PipeConsensusClientMgrContainer {
+public class IoTV2GlobalComponentContainer {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(IoTV2GlobalComponentContainer.class);
   private static final CommonConfig CONF = 
CommonDescriptor.getInstance().getConfig();
   private final PipeConsensusClientProperty config;
   private final IClientManager<TEndPoint, AsyncPipeConsensusServiceClient> 
asyncClientManager;
   private final IClientManager<TEndPoint, SyncPipeConsensusServiceClient> 
syncClientManager;
+  private final ScheduledExecutorService backgroundTaskService;
 
-  private PipeConsensusClientMgrContainer() {
+  private IoTV2GlobalComponentContainer() {
     // load rpc client config
     this.config =
         PipeConsensusClientProperty.newBuilder()
@@ -57,6 +67,9 @@ public class PipeConsensusClientMgrContainer {
     this.syncClientManager =
         new IClientManager.Factory<TEndPoint, SyncPipeConsensusServiceClient>()
             .createClientManager(new 
SyncPipeConsensusServiceClientPoolFactory(config));
+    this.backgroundTaskService =
+        IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor(
+            ThreadName.PIPE_CONSENSUS_BACKGROUND_TASK_EXECUTOR.getName());
   }
 
   public IClientManager<TEndPoint, AsyncPipeConsensusServiceClient> 
getGlobalAsyncClientManager() {
@@ -67,19 +80,35 @@ public class PipeConsensusClientMgrContainer {
     return this.syncClientManager;
   }
 
+  public ScheduledExecutorService getBackgroundTaskService() {
+    return this.backgroundTaskService;
+  }
+
+  public void stopBackgroundTaskService() {
+    backgroundTaskService.shutdownNow();
+    try {
+      if (!backgroundTaskService.awaitTermination(30, TimeUnit.SECONDS)) {
+        LOGGER.warn("IoTV2 background service did not terminate within {}s", 
30);
+      }
+    } catch (InterruptedException e) {
+      LOGGER.warn("IoTV2 background Thread still doesn't exit after 30s");
+      Thread.currentThread().interrupt();
+    }
+  }
+
   private static class PipeConsensusClientMgrContainerHolder {
-    private static PipeConsensusClientMgrContainer INSTANCE;
+    private static IoTV2GlobalComponentContainer INSTANCE;
 
     private PipeConsensusClientMgrContainerHolder() {}
 
     public static void build() {
       if (INSTANCE == null) {
-        INSTANCE = new PipeConsensusClientMgrContainer();
+        INSTANCE = new IoTV2GlobalComponentContainer();
       }
     }
   }
 
-  public static PipeConsensusClientMgrContainer getInstance() {
+  public static IoTV2GlobalComponentContainer getInstance() {
     return PipeConsensusClientMgrContainerHolder.INSTANCE;
   }
 
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
index e208f0eb87a..076194dc3ae 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
@@ -106,6 +106,7 @@ public enum ThreadName {
   
ASYNC_DATANODE_PIPE_CONSENSUS_CLIENT_POOL("AsyncDataNodePipeConsensusServiceClientPool"),
   PIPE_CONSENSUS_DELETION_SERIALIZE("DAL-Serialize"),
   PIPE_CONSENSUS_TSFILE_WRITER_CHECKER("PipeConsensus-TsFileWriter-Checker"),
+  
PIPE_CONSENSUS_BACKGROUND_TASK_EXECUTOR("PipeConsensusBackgroundTaskExecutor"),
 
   // -------------------------- IoTConsensus --------------------------
   IOT_CONSENSUS_RPC_SERVICE("IoTConsensusRPC-Service"),
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/EnrichedEvent.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/EnrichedEvent.java
index 6b4ae19464a..a64dd760688 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/EnrichedEvent.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/EnrichedEvent.java
@@ -64,6 +64,8 @@ public abstract class EnrichedEvent implements Event {
   // Used in IoTConsensusV2
   protected long replicateIndexForIoTV2 = NO_COMMIT_ID;
   protected int rebootTimes = 0;
+  public static final long INITIAL_RETRY_INTERVAL_FOR_IOTV2 = 500L;
+  protected long retryInterval = INITIAL_RETRY_INTERVAL_FOR_IOTV2;
 
   protected final TreePattern treePattern;
   protected final TablePattern tablePattern;
@@ -429,6 +431,14 @@ public abstract class EnrichedEvent implements Event {
     return rebootTimes;
   }
 
+  public long getRetryInterval() {
+    return this.retryInterval;
+  }
+
+  public long setRetryInterval(final long retryInterval) {
+    return retryInterval;
+  }
+
   public CommitterKey getCommitterKey() {
     return committerKey;
   }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/PipeReceiverStatusHandler.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/PipeReceiverStatusHandler.java
index 83665780f77..779956b21d0 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/PipeReceiverStatusHandler.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/PipeReceiverStatusHandler.java
@@ -102,6 +102,11 @@ public class PipeReceiverStatusHandler {
           exceptionMessage, Integer.MAX_VALUE);
     }
 
+    if (RetryUtils.notNeedRetryForConsensus(status.getCode())) {
+      LOGGER.info("IoTConsensusV2: will not retry. status: {}", status);
+      return;
+    }
+
     switch (status.getCode()) {
       case 200: // SUCCESS_STATUS
       case 400: // REDIRECTION_RECOMMEND
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/RetryUtils.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/RetryUtils.java
index c0ea52e6b74..2f3b6715686 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/RetryUtils.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/RetryUtils.java
@@ -19,8 +19,11 @@
 
 package org.apache.iotdb.commons.utils;
 
+import 
org.apache.iotdb.pipe.api.exception.PipeConsensusRetryWithIncreasingIntervalException;
 import org.apache.iotdb.rpc.TSStatusCode;
 
+import java.net.ConnectException;
+
 public class RetryUtils {
 
   public interface CallableWithException<T, E extends Exception> {
@@ -33,6 +36,15 @@ public class RetryUtils {
         || statusCode == TSStatusCode.WRITE_PROCESS_REJECT.getStatusCode();
   }
 
+  public static boolean needRetryWithIncreasingInterval(Exception e) {
+    return e instanceof ConnectException
+        || e instanceof PipeConsensusRetryWithIncreasingIntervalException;
+  }
+
+  public static boolean notNeedRetryForConsensus(int statusCode) {
+    return statusCode == 
TSStatusCode.PIPE_CONSENSUS_DEPRECATED_REQUEST.getStatusCode();
+  }
+
   public static final int MAX_RETRIES = 3;
 
   public static <T, E extends Exception> T retryOnException(

Reply via email to