This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new c893101de1b Pipe: cache leader support batch mode (#12486)
c893101de1b is described below

commit c893101de1be6bc61ab38e6570ad1dc75d23efb1
Author: Zikun Ma <[email protected]>
AuthorDate: Mon May 13 23:00:49 2024 +0800

    Pipe: cache leader support batch mode (#12486)
    
    Co-authored-by: Steve Yurong Su <[email protected]>
---
 .../client/IoTDBDataNodeAsyncClientManager.java    |  10 +-
 .../IoTDBDataNodeCacheLeaderClientManager.java     |   2 +-
 .../client/IoTDBDataNodeSyncClientManager.java     |   9 +
 ...oTDBThriftAsyncPipeTransferBatchReqBuilder.java |  36 ----
 ...IoTDBThriftSyncPipeTransferBatchReqBuilder.java |  29 ---
 ...ferBatchReqBuilder.java => PipeEventBatch.java} |  61 +++---
 .../builder/PipeTransferBatchReqBuilder.java       | 219 +++++++--------------
 .../async/IoTDBDataRegionAsyncConnector.java       |  49 +++--
 .../PipeTransferTabletBatchEventHandler.java       |  19 +-
 .../thrift/sync/IoTDBDataRegionSyncConnector.java  |  40 ++--
 .../db/pipe/connector/util/LeaderCacheUtils.java   |  69 +++++++
 .../protocol/thrift/IoTDBDataNodeReceiver.java     |  51 ++++-
 .../crud/InsertMultiTabletsStatement.java          |   8 +
 13 files changed, 313 insertions(+), 289 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeAsyncClientManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeAsyncClientManager.java
index 729c3dbc8bc..62eeab93caa 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeAsyncClientManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeAsyncClientManager.java
@@ -43,6 +43,7 @@ import org.slf4j.LoggerFactory;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Objects;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
@@ -107,12 +108,15 @@ public class IoTDBDataNodeAsyncClientManager extends 
IoTDBClientManager
   }
 
   public AsyncPipeDataTransferServiceClient borrowClient(String deviceId) 
throws Exception {
-    if (!useLeaderCache) {
+    if (!useLeaderCache || Objects.isNull(deviceId)) {
       return borrowClient();
     }
 
-    final TEndPoint endPoint = 
LEADER_CACHE_MANAGER.getLeaderEndPoint(deviceId);
-    if (endPoint == null) {
+    return borrowClient(LEADER_CACHE_MANAGER.getLeaderEndPoint(deviceId));
+  }
+
+  public AsyncPipeDataTransferServiceClient borrowClient(TEndPoint endPoint) 
throws Exception {
+    if (!useLeaderCache || Objects.isNull(endPoint)) {
       return borrowClient();
     }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeCacheLeaderClientManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeCacheLeaderClientManager.java
index b6a6511efc7..29e46e60635 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeCacheLeaderClientManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeCacheLeaderClientManager.java
@@ -33,7 +33,7 @@ import org.slf4j.LoggerFactory;
 
 import java.util.concurrent.ConcurrentHashMap;
 
-interface IoTDBDataNodeCacheLeaderClientManager {
+public interface IoTDBDataNodeCacheLeaderClientManager {
 
   LeaderCacheManager LEADER_CACHE_MANAGER = new LeaderCacheManager();
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeSyncClientManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeSyncClientManager.java
index e281555e889..969d8d74d38 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeSyncClientManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeSyncClientManager.java
@@ -79,6 +79,15 @@ public class IoTDBDataNodeSyncClientManager extends 
IoTDBSyncClientManager
         : getClient();
   }
 
+  public Pair<IoTDBSyncClient, Boolean> getClient(TEndPoint endPoint) {
+    return useLeaderCache
+            && endPoint != null
+            && endPoint2ClientAndStatus.containsKey(endPoint)
+            && 
Boolean.TRUE.equals(endPoint2ClientAndStatus.get(endPoint).getRight())
+        ? endPoint2ClientAndStatus.get(endPoint)
+        : getClient();
+  }
+
   public void updateLeaderCache(String deviceId, TEndPoint endPoint) {
     if (!useLeaderCache) {
       return;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/builder/IoTDBThriftAsyncPipeTransferBatchReqBuilder.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/builder/IoTDBThriftAsyncPipeTransferBatchReqBuilder.java
deleted file mode 100644
index 394bc0f5b0c..00000000000
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/builder/IoTDBThriftAsyncPipeTransferBatchReqBuilder.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.db.pipe.connector.payload.evolvable.builder;
-
-import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
-
-import java.util.ArrayList;
-import java.util.List;
-
-public class IoTDBThriftAsyncPipeTransferBatchReqBuilder extends 
PipeTransferBatchReqBuilder {
-
-  public IoTDBThriftAsyncPipeTransferBatchReqBuilder(PipeParameters 
parameters) {
-    super(parameters);
-  }
-
-  public List<Long> deepCopyRequestCommitIds() {
-    return new ArrayList<>(requestCommitIds);
-  }
-}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/builder/IoTDBThriftSyncPipeTransferBatchReqBuilder.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/builder/IoTDBThriftSyncPipeTransferBatchReqBuilder.java
deleted file mode 100644
index 95b03927b4d..00000000000
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/builder/IoTDBThriftSyncPipeTransferBatchReqBuilder.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.db.pipe.connector.payload.evolvable.builder;
-
-import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
-
-public class IoTDBThriftSyncPipeTransferBatchReqBuilder extends 
PipeTransferBatchReqBuilder {
-
-  public IoTDBThriftSyncPipeTransferBatchReqBuilder(final PipeParameters 
parameters) {
-    super(parameters);
-  }
-}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/builder/PipeTransferBatchReqBuilder.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/builder/PipeEventBatch.java
similarity index 76%
copy from 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/builder/PipeTransferBatchReqBuilder.java
copy to 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/builder/PipeEventBatch.java
index f7a17f3d7fc..74b49ce7272 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/builder/PipeTransferBatchReqBuilder.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/builder/PipeEventBatch.java
@@ -27,7 +27,6 @@ import org.apache.iotdb.db.pipe.resource.PipeResourceManager;
 import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryBlock;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
 import 
org.apache.iotdb.db.storageengine.dataregion.wal.exception.WALPipeException;
-import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
 import org.apache.iotdb.pipe.api.event.Event;
 import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
 
@@ -40,49 +39,31 @@ import java.io.DataOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.List;
 import java.util.Objects;
 
-import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_BATCH_DELAY_DEFAULT_VALUE;
-import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_BATCH_DELAY_KEY;
-import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_BATCH_SIZE_DEFAULT_VALUE;
-import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_BATCH_SIZE_KEY;
-import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_IOTDB_BATCH_DELAY_KEY;
-import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_IOTDB_BATCH_SIZE_KEY;
+public class PipeEventBatch implements AutoCloseable {
 
-public abstract class PipeTransferBatchReqBuilder implements AutoCloseable {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(PipeEventBatch.class);
 
-  private static final Logger LOGGER = 
LoggerFactory.getLogger(PipeTransferBatchReqBuilder.class);
+  private final List<Event> events = new ArrayList<>();
+  private final List<Long> requestCommitIds = new ArrayList<>();
 
-  protected final List<Event> events = new ArrayList<>();
-  protected final List<Long> requestCommitIds = new ArrayList<>();
-
-  protected final List<ByteBuffer> binaryBuffers = new ArrayList<>();
-  protected final List<ByteBuffer> insertNodeBuffers = new ArrayList<>();
-  protected final List<ByteBuffer> tabletBuffers = new ArrayList<>();
+  private final List<ByteBuffer> binaryBuffers = new ArrayList<>();
+  private final List<ByteBuffer> insertNodeBuffers = new ArrayList<>();
+  private final List<ByteBuffer> tabletBuffers = new ArrayList<>();
 
   // limit in delayed time
-  protected final int maxDelayInMs;
-  protected long firstEventProcessingTime = Long.MIN_VALUE;
+  private final int maxDelayInMs;
+  private long firstEventProcessingTime = Long.MIN_VALUE;
 
   // limit in buffer size
-  protected final PipeMemoryBlock allocatedMemoryBlock;
-  protected long totalBufferSize = 0;
-
-  protected PipeTransferBatchReqBuilder(final PipeParameters parameters) {
-    maxDelayInMs =
-        parameters.getIntOrDefault(
-                Arrays.asList(CONNECTOR_IOTDB_BATCH_DELAY_KEY, 
SINK_IOTDB_BATCH_DELAY_KEY),
-                CONNECTOR_IOTDB_BATCH_DELAY_DEFAULT_VALUE)
-            * 1000;
-
-    final long requestMaxBatchSizeInBytes =
-        parameters.getLongOrDefault(
-            Arrays.asList(CONNECTOR_IOTDB_BATCH_SIZE_KEY, 
SINK_IOTDB_BATCH_SIZE_KEY),
-            CONNECTOR_IOTDB_BATCH_SIZE_DEFAULT_VALUE);
-
-    allocatedMemoryBlock =
+  private final PipeMemoryBlock allocatedMemoryBlock;
+  private long totalBufferSize = 0;
+
+  public PipeEventBatch(int maxDelayInMs, long requestMaxBatchSizeInBytes) {
+    this.maxDelayInMs = maxDelayInMs;
+    this.allocatedMemoryBlock =
         PipeResourceManager.memory()
             .tryAllocate(requestMaxBatchSizeInBytes)
             .setShrinkMethod(oldMemory -> Math.max(oldMemory / 2, 0))
@@ -107,10 +88,10 @@ public abstract class PipeTransferBatchReqBuilder 
implements AutoCloseable {
   }
 
   /**
-   * Try offer {@link Event} into cache if the given {@link Event} is not 
duplicated.
+   * Try offer {@link Event} into batch if the given {@link Event} is not 
duplicated.
    *
    * @param event the given {@link Event}
-   * @return {@link true} if the batch can be transferred
+   * @return {@code true} if the batch can be transferred
    */
   public synchronized boolean onEvent(final TabletInsertionEvent event)
       throws IOException, WALPipeException {
@@ -163,7 +144,7 @@ public abstract class PipeTransferBatchReqBuilder 
implements AutoCloseable {
         binaryBuffers, insertNodeBuffers, tabletBuffers);
   }
 
-  protected long getMaxBatchSizeInBytes() {
+  private long getMaxBatchSizeInBytes() {
     return allocatedMemoryBlock.getMemoryUsageInBytes();
   }
 
@@ -175,7 +156,11 @@ public abstract class PipeTransferBatchReqBuilder 
implements AutoCloseable {
     return new ArrayList<>(events);
   }
 
-  protected int buildTabletInsertionBuffer(final TabletInsertionEvent event)
+  public List<Long> deepCopyRequestCommitIds() {
+    return new ArrayList<>(requestCommitIds);
+  }
+
+  private int buildTabletInsertionBuffer(final TabletInsertionEvent event)
       throws IOException, WALPipeException {
     final ByteBuffer buffer;
     if (event instanceof PipeInsertNodeTabletInsertionEvent) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/builder/PipeTransferBatchReqBuilder.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/builder/PipeTransferBatchReqBuilder.java
index f7a17f3d7fc..2b19bb1275d 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/builder/PipeTransferBatchReqBuilder.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/builder/PipeTransferBatchReqBuilder.java
@@ -19,212 +19,145 @@
 
 package org.apache.iotdb.db.pipe.connector.payload.evolvable.builder;
 
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
-import 
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletBatchReq;
+import 
org.apache.iotdb.db.pipe.connector.client.IoTDBDataNodeCacheLeaderClientManager;
 import 
org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent;
 import 
org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
-import org.apache.iotdb.db.pipe.resource.PipeResourceManager;
-import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryBlock;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
 import 
org.apache.iotdb.db.storageengine.dataregion.wal.exception.WALPipeException;
 import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
 import org.apache.iotdb.pipe.api.event.Event;
 import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
 
-import org.apache.tsfile.utils.PublicBAOS;
-import org.apache.tsfile.utils.ReadWriteIOUtils;
+import org.apache.tsfile.utils.Pair;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.DataOutputStream;
 import java.io.IOException;
-import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.Objects;
 
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_BATCH_DELAY_DEFAULT_VALUE;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_BATCH_DELAY_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_BATCH_SIZE_DEFAULT_VALUE;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_BATCH_SIZE_KEY;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_LEADER_CACHE_ENABLE_DEFAULT_VALUE;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_LEADER_CACHE_ENABLE_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_IOTDB_BATCH_DELAY_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_IOTDB_BATCH_SIZE_KEY;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_LEADER_CACHE_ENABLE_KEY;
 
-public abstract class PipeTransferBatchReqBuilder implements AutoCloseable {
+public class PipeTransferBatchReqBuilder implements AutoCloseable {
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(PipeTransferBatchReqBuilder.class);
 
-  protected final List<Event> events = new ArrayList<>();
-  protected final List<Long> requestCommitIds = new ArrayList<>();
+  private final boolean useLeaderCache;
 
-  protected final List<ByteBuffer> binaryBuffers = new ArrayList<>();
-  protected final List<ByteBuffer> insertNodeBuffers = new ArrayList<>();
-  protected final List<ByteBuffer> tabletBuffers = new ArrayList<>();
+  private final int requestMaxDelayInMs;
+  private final long requestMaxBatchSizeInBytes;
 
-  // limit in delayed time
-  protected final int maxDelayInMs;
-  protected long firstEventProcessingTime = Long.MIN_VALUE;
+  // If the leader cache is disabled (or unable to find the endpoint of event 
in the leader cache),
+  // the event will be stored in the default batch.
+  private final PipeEventBatch defaultBatch;
+  // If the leader cache is enabled, the batch will be divided by the leader 
endpoint,
+  // each endpoint has a batch.
+  private final Map<TEndPoint, PipeEventBatch> endPointToBatch = new 
HashMap<>();
 
-  // limit in buffer size
-  protected final PipeMemoryBlock allocatedMemoryBlock;
-  protected long totalBufferSize = 0;
+  public PipeTransferBatchReqBuilder(final PipeParameters parameters) {
+    useLeaderCache =
+        parameters.getBooleanOrDefault(
+            Arrays.asList(SINK_LEADER_CACHE_ENABLE_KEY, 
CONNECTOR_LEADER_CACHE_ENABLE_KEY),
+            CONNECTOR_LEADER_CACHE_ENABLE_DEFAULT_VALUE);
 
-  protected PipeTransferBatchReqBuilder(final PipeParameters parameters) {
-    maxDelayInMs =
+    requestMaxDelayInMs =
         parameters.getIntOrDefault(
                 Arrays.asList(CONNECTOR_IOTDB_BATCH_DELAY_KEY, 
SINK_IOTDB_BATCH_DELAY_KEY),
                 CONNECTOR_IOTDB_BATCH_DELAY_DEFAULT_VALUE)
             * 1000;
-
-    final long requestMaxBatchSizeInBytes =
+    requestMaxBatchSizeInBytes =
         parameters.getLongOrDefault(
             Arrays.asList(CONNECTOR_IOTDB_BATCH_SIZE_KEY, 
SINK_IOTDB_BATCH_SIZE_KEY),
             CONNECTOR_IOTDB_BATCH_SIZE_DEFAULT_VALUE);
 
-    allocatedMemoryBlock =
-        PipeResourceManager.memory()
-            .tryAllocate(requestMaxBatchSizeInBytes)
-            .setShrinkMethod(oldMemory -> Math.max(oldMemory / 2, 0))
-            .setShrinkCallback(
-                (oldMemory, newMemory) ->
-                    LOGGER.info(
-                        "The batch size limit has shrunk from {} to {}.", 
oldMemory, newMemory))
-            .setExpandMethod(
-                oldMemory -> Math.min(Math.max(oldMemory, 1) * 2, 
requestMaxBatchSizeInBytes))
-            .setExpandCallback(
-                (oldMemory, newMemory) ->
-                    LOGGER.info(
-                        "The batch size limit has expanded from {} to {}.", 
oldMemory, newMemory));
-
-    if (getMaxBatchSizeInBytes() != requestMaxBatchSizeInBytes) {
-      LOGGER.info(
-          "PipeTransferBatchReqBuilder: the max batch size is adjusted from {} 
to {} due to the "
-              + "memory restriction",
-          requestMaxBatchSizeInBytes,
-          getMaxBatchSizeInBytes());
-    }
+    this.defaultBatch = new PipeEventBatch(requestMaxDelayInMs, 
requestMaxBatchSizeInBytes);
   }
 
   /**
-   * Try offer {@link Event} into cache if the given {@link Event} is not 
duplicated.
+   * Try offer {@link Event} into the corresponding batch if the given {@link 
Event} is not
+   * duplicated.
    *
    * @param event the given {@link Event}
-   * @return {@link true} if the batch can be transferred
+   * @return {@link Pair}<{@link TEndPoint}, {@link PipeEventBatch}> not null 
means this {@link
+   *     PipeEventBatch} can be transferred. the first element is the leader 
endpoint to transfer to
+   *     (might be null), the second element is the batch to be transferred.
    */
-  public synchronized boolean onEvent(final TabletInsertionEvent event)
+  public synchronized Pair<TEndPoint, PipeEventBatch> onEvent(final 
TabletInsertionEvent event)
       throws IOException, WALPipeException {
     if (!(event instanceof EnrichedEvent)) {
-      return false;
+      LOGGER.warn(
+          "Unsupported event {} type {} when building transfer request", 
event, event.getClass());
+      return null;
     }
 
-    final long requestCommitId = ((EnrichedEvent) event).getCommitId();
-
-    // The deduplication logic here is to avoid the accumulation of the same 
event in a batch when
-    // retrying.
-    if ((events.isEmpty() || !events.get(events.size() - 1).equals(event))) {
-      // We increase the reference count for this event to determine if the 
event may be released.
-      if (((EnrichedEvent) event)
-          
.increaseReferenceCount(PipeTransferBatchReqBuilder.class.getName())) {
-        events.add(event);
-        requestCommitIds.add(requestCommitId);
-
-        final int bufferSize = buildTabletInsertionBuffer(event);
-        totalBufferSize += bufferSize;
-
-        if (firstEventProcessingTime == Long.MIN_VALUE) {
-          firstEventProcessingTime = System.currentTimeMillis();
-        }
-      } else {
-        ((EnrichedEvent) event)
-            
.decreaseReferenceCount(PipeTransferBatchReqBuilder.class.getName(), false);
-      }
+    if (!useLeaderCache) {
+      return defaultBatch.onEvent(event) ? new Pair<>(null, defaultBatch) : 
null;
     }
 
-    return totalBufferSize >= getMaxBatchSizeInBytes()
-        || System.currentTimeMillis() - firstEventProcessingTime >= 
maxDelayInMs;
-  }
-
-  public synchronized void onSuccess() {
-    binaryBuffers.clear();
-    insertNodeBuffers.clear();
-    tabletBuffers.clear();
-
-    events.clear();
-    requestCommitIds.clear();
+    String deviceId = null;
+    if (event instanceof PipeRawTabletInsertionEvent) {
+      deviceId = ((PipeRawTabletInsertionEvent) event).getDeviceId();
+    } else if (event instanceof PipeInsertNodeTabletInsertionEvent) {
+      final InsertNode insertNode =
+          ((PipeInsertNodeTabletInsertionEvent) 
event).getInsertNodeViaCacheIfPossible();
+      if (Objects.nonNull(insertNode)) {
+        deviceId = insertNode.getDevicePath().getFullPath();
+      }
+    }
 
-    firstEventProcessingTime = Long.MIN_VALUE;
+    if (Objects.isNull(deviceId)) {
+      return defaultBatch.onEvent(event) ? new Pair<>(null, defaultBatch) : 
null;
+    }
 
-    totalBufferSize = 0;
-  }
+    final TEndPoint endPoint =
+        
IoTDBDataNodeCacheLeaderClientManager.LEADER_CACHE_MANAGER.getLeaderEndPoint(deviceId);
+    if (Objects.isNull(endPoint)) {
+      return defaultBatch.onEvent(event) ? new Pair<>(null, defaultBatch) : 
null;
+    }
 
-  public PipeTransferTabletBatchReq toTPipeTransferReq() throws IOException {
-    return PipeTransferTabletBatchReq.toTPipeTransferReq(
-        binaryBuffers, insertNodeBuffers, tabletBuffers);
+    final PipeEventBatch batch =
+        endPointToBatch.computeIfAbsent(
+            endPoint, k -> new PipeEventBatch(requestMaxDelayInMs, 
requestMaxBatchSizeInBytes));
+    return batch.onEvent(event) ? new Pair<>(endPoint, batch) : null;
   }
 
-  protected long getMaxBatchSizeInBytes() {
-    return allocatedMemoryBlock.getMemoryUsageInBytes();
+  /** Get all batches that have at least 1 event. */
+  public synchronized List<Pair<TEndPoint, PipeEventBatch>> 
getAllNonEmptyBatches() {
+    final List<Pair<TEndPoint, PipeEventBatch>> nonEmptyBatches = new 
ArrayList<>();
+    if (!defaultBatch.isEmpty()) {
+      nonEmptyBatches.add(new Pair<>(null, defaultBatch));
+    }
+    endPointToBatch.forEach(
+        (endPoint, batch) -> {
+          if (!batch.isEmpty()) {
+            nonEmptyBatches.add(new Pair<>(endPoint, batch));
+          }
+        });
+    return nonEmptyBatches;
   }
 
   public boolean isEmpty() {
-    return binaryBuffers.isEmpty() && insertNodeBuffers.isEmpty() && 
tabletBuffers.isEmpty();
-  }
-
-  public List<Event> deepCopyEvents() {
-    return new ArrayList<>(events);
-  }
-
-  protected int buildTabletInsertionBuffer(final TabletInsertionEvent event)
-      throws IOException, WALPipeException {
-    final ByteBuffer buffer;
-    if (event instanceof PipeInsertNodeTabletInsertionEvent) {
-      final PipeInsertNodeTabletInsertionEvent 
pipeInsertNodeTabletInsertionEvent =
-          (PipeInsertNodeTabletInsertionEvent) event;
-      // Read the bytebuffer from the wal file and transfer it directly 
without serializing or
-      // deserializing if possible
-      final InsertNode insertNode =
-          pipeInsertNodeTabletInsertionEvent.getInsertNodeViaCacheIfPossible();
-      if (Objects.isNull(insertNode)) {
-        buffer = pipeInsertNodeTabletInsertionEvent.getByteBuffer();
-        binaryBuffers.add(buffer);
-      } else {
-        buffer = insertNode.serializeToByteBuffer();
-        insertNodeBuffers.add(buffer);
-      }
-    } else {
-      final PipeRawTabletInsertionEvent pipeRawTabletInsertionEvent =
-          (PipeRawTabletInsertionEvent) event;
-      try (final PublicBAOS byteArrayOutputStream = new PublicBAOS();
-          final DataOutputStream outputStream = new 
DataOutputStream(byteArrayOutputStream)) {
-        pipeRawTabletInsertionEvent.convertToTablet().serialize(outputStream);
-        ReadWriteIOUtils.write(pipeRawTabletInsertionEvent.isAligned(), 
outputStream);
-        buffer = ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0, 
byteArrayOutputStream.size());
-      }
-      tabletBuffers.add(buffer);
-    }
-    return buffer.limit();
+    return defaultBatch.isEmpty()
+        && endPointToBatch.values().stream().allMatch(PipeEventBatch::isEmpty);
   }
 
   @Override
   public synchronized void close() {
-    clearEventsReferenceCount(PipeTransferBatchReqBuilder.class.getName());
-    allocatedMemoryBlock.close();
-  }
-
-  public void decreaseEventsReferenceCount(final String holderMessage, final 
boolean shouldReport) {
-    for (final Event event : events) {
-      if (event instanceof EnrichedEvent) {
-        ((EnrichedEvent) event).decreaseReferenceCount(holderMessage, 
shouldReport);
-      }
-    }
-  }
-
-  public void clearEventsReferenceCount(final String holderMessage) {
-    for (final Event event : events) {
-      if (event instanceof EnrichedEvent) {
-        ((EnrichedEvent) event).clearReferenceCount(holderMessage);
-      }
-    }
+    defaultBatch.close();
+    endPointToBatch.values().forEach(PipeEventBatch::close);
   }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
index aef8f2f340a..74cbfe2e39b 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
@@ -24,7 +24,8 @@ import 
org.apache.iotdb.commons.client.async.AsyncPipeDataTransferServiceClient;
 import org.apache.iotdb.commons.pipe.connector.protocol.IoTDBConnector;
 import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
 import 
org.apache.iotdb.db.pipe.connector.client.IoTDBDataNodeAsyncClientManager;
-import 
org.apache.iotdb.db.pipe.connector.payload.evolvable.builder.IoTDBThriftAsyncPipeTransferBatchReqBuilder;
+import 
org.apache.iotdb.db.pipe.connector.payload.evolvable.builder.PipeEventBatch;
+import 
org.apache.iotdb.db.pipe.connector.payload.evolvable.builder.PipeTransferBatchReqBuilder;
 import 
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletBinaryReq;
 import 
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletInsertNodeReq;
 import 
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletRawReq;
@@ -48,6 +49,7 @@ import 
org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
 import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
 import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
 
+import org.apache.tsfile.utils.Pair;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -85,7 +87,7 @@ public class IoTDBDataRegionAsyncConnector extends 
IoTDBConnector {
   private final IoTDBDataRegionSyncConnector retryConnector = new 
IoTDBDataRegionSyncConnector();
   private final BlockingQueue<Event> retryEventQueue = new 
LinkedBlockingQueue<>();
 
-  private IoTDBThriftAsyncPipeTransferBatchReqBuilder tabletBatchBuilder;
+  private PipeTransferBatchReqBuilder tabletBatchBuilder;
 
   private final AtomicBoolean isClosed = new AtomicBoolean(false);
 
@@ -126,7 +128,7 @@ public class IoTDBDataRegionAsyncConnector extends 
IoTDBConnector {
             loadBalanceStrategy);
 
     if (isTabletBatchModeEnabled) {
-      tabletBatchBuilder = new 
IoTDBThriftAsyncPipeTransferBatchReqBuilder(parameters);
+      tabletBatchBuilder = new PipeTransferBatchReqBuilder(parameters);
     }
   }
 
@@ -160,13 +162,13 @@ public class IoTDBDataRegionAsyncConnector extends 
IoTDBConnector {
   private void transferWithoutCheck(final TabletInsertionEvent 
tabletInsertionEvent)
       throws Exception {
     if (isTabletBatchModeEnabled) {
-      if (tabletBatchBuilder.onEvent(tabletInsertionEvent)) {
-        final PipeTransferTabletBatchEventHandler 
pipeTransferTabletBatchEventHandler =
-            new PipeTransferTabletBatchEventHandler(tabletBatchBuilder, this);
-
-        transfer(pipeTransferTabletBatchEventHandler);
-
-        tabletBatchBuilder.onSuccess();
+      final Pair<TEndPoint, PipeEventBatch> endPointAndBatch =
+          tabletBatchBuilder.onEvent(tabletInsertionEvent);
+      if (Objects.nonNull(endPointAndBatch)) {
+        transfer(
+            endPointAndBatch.getLeft(),
+            new 
PipeTransferTabletBatchEventHandler(endPointAndBatch.getRight(), this));
+        endPointAndBatch.getRight().onSuccess();
       }
     } else {
       if (tabletInsertionEvent instanceof PipeInsertNodeTabletInsertionEvent) {
@@ -191,7 +193,9 @@ public class IoTDBDataRegionAsyncConnector extends 
IoTDBConnector {
             new PipeTransferTabletInsertNodeEventHandler(
                 pipeInsertNodeTabletInsertionEvent, pipeTransferReq, this);
 
-        transfer(pipeTransferInsertNodeReqHandler);
+        transfer(
+            Objects.nonNull(insertNode) ? 
insertNode.getDevicePath().getFullPath() : null,
+            pipeTransferInsertNodeReqHandler);
       } else { // tabletInsertionEvent instanceof PipeRawTabletInsertionEvent
         final PipeRawTabletInsertionEvent pipeRawTabletInsertionEvent =
             (PipeRawTabletInsertionEvent) tabletInsertionEvent;
@@ -211,16 +215,17 @@ public class IoTDBDataRegionAsyncConnector extends 
IoTDBConnector {
             new PipeTransferTabletRawEventHandler(
                 pipeRawTabletInsertionEvent, pipeTransferTabletRawReq, this);
 
-        transfer(pipeTransferTabletReqHandler);
+        transfer(pipeRawTabletInsertionEvent.getDeviceId(), 
pipeTransferTabletReqHandler);
       }
     }
   }
 
   private void transfer(
+      final TEndPoint endPoint,
       final PipeTransferTabletBatchEventHandler 
pipeTransferTabletBatchEventHandler) {
     AsyncPipeDataTransferServiceClient client = null;
     try {
-      client = clientManager.borrowClient();
+      client = clientManager.borrowClient(endPoint);
       pipeTransferTabletBatchEventHandler.transfer(client);
     } catch (final Exception ex) {
       logOnClientException(client, ex);
@@ -229,10 +234,11 @@ public class IoTDBDataRegionAsyncConnector extends 
IoTDBConnector {
   }
 
   private void transfer(
+      final String deviceId,
       final PipeTransferTabletInsertNodeEventHandler 
pipeTransferInsertNodeReqHandler) {
     AsyncPipeDataTransferServiceClient client = null;
     try {
-      client = clientManager.borrowClient();
+      client = clientManager.borrowClient(deviceId);
       pipeTransferInsertNodeReqHandler.transfer(client);
     } catch (final Exception ex) {
       logOnClientException(client, ex);
@@ -240,10 +246,11 @@ public class IoTDBDataRegionAsyncConnector extends 
IoTDBConnector {
     }
   }
 
-  private void transfer(final PipeTransferTabletRawEventHandler 
pipeTransferTabletReqHandler) {
+  private void transfer(
+      final String deviceId, final PipeTransferTabletRawEventHandler 
pipeTransferTabletReqHandler) {
     AsyncPipeDataTransferServiceClient client = null;
     try {
-      client = clientManager.borrowClient();
+      client = clientManager.borrowClient(deviceId);
       pipeTransferTabletReqHandler.transfer(client);
     } catch (final Exception ex) {
       logOnClientException(client, ex);
@@ -399,9 +406,13 @@ public class IoTDBDataRegionAsyncConnector extends 
IoTDBConnector {
       return;
     }
 
-    transfer(new PipeTransferTabletBatchEventHandler(tabletBatchBuilder, 
this));
-
-    tabletBatchBuilder.onSuccess();
+    for (final Pair<TEndPoint, PipeEventBatch> endPointAndBatch :
+        tabletBatchBuilder.getAllNonEmptyBatches()) {
+      transfer(
+          endPointAndBatch.getLeft(),
+          new PipeTransferTabletBatchEventHandler(endPointAndBatch.getRight(), 
this));
+      endPointAndBatch.getRight().onSuccess();
+    }
   }
 
   /**
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletBatchEventHandler.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletBatchEventHandler.java
index aa867ca05b1..500ed4cab98 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletBatchEventHandler.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletBatchEventHandler.java
@@ -19,11 +19,13 @@
 
 package org.apache.iotdb.db.pipe.connector.protocol.thrift.async.handler;
 
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import 
org.apache.iotdb.commons.client.async.AsyncPipeDataTransferServiceClient;
 import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
-import 
org.apache.iotdb.db.pipe.connector.payload.evolvable.builder.IoTDBThriftAsyncPipeTransferBatchReqBuilder;
+import 
org.apache.iotdb.db.pipe.connector.payload.evolvable.builder.PipeEventBatch;
 import 
org.apache.iotdb.db.pipe.connector.protocol.thrift.async.IoTDBDataRegionAsyncConnector;
+import org.apache.iotdb.db.pipe.connector.util.LeaderCacheUtils;
 import org.apache.iotdb.pipe.api.event.Event;
 import org.apache.iotdb.pipe.api.exception.PipeException;
 import org.apache.iotdb.rpc.TSStatusCode;
@@ -32,6 +34,7 @@ import org.apache.iotdb.service.rpc.thrift.TPipeTransferResp;
 
 import org.apache.thrift.TException;
 import org.apache.thrift.async.AsyncMethodCallback;
+import org.apache.tsfile.utils.Pair;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -51,13 +54,12 @@ public class PipeTransferTabletBatchEventHandler implements 
AsyncMethodCallback<
   private final IoTDBDataRegionAsyncConnector connector;
 
   public PipeTransferTabletBatchEventHandler(
-      final IoTDBThriftAsyncPipeTransferBatchReqBuilder batchBuilder,
-      final IoTDBDataRegionAsyncConnector connector)
+      final PipeEventBatch batch, final IoTDBDataRegionAsyncConnector 
connector)
       throws IOException {
     // Deep copy to keep Ids' and events' reference
-    requestCommitIds = batchBuilder.deepCopyRequestCommitIds();
-    events = batchBuilder.deepCopyEvents();
-    req = batchBuilder.toTPipeTransferReq();
+    requestCommitIds = batch.deepCopyRequestCommitIds();
+    events = batch.deepCopyEvents();
+    req = batch.toTPipeTransferReq();
 
     this.connector = connector;
   }
@@ -83,6 +85,11 @@ public class PipeTransferTabletBatchEventHandler implements 
AsyncMethodCallback<
             .statusHandler()
             .handle(status, response.getStatus().getMessage(), 
events.toString());
       }
+      for (final Pair<String, TEndPoint> redirectPair :
+          LeaderCacheUtils.parseRecommendedRedirections(status)) {
+        connector.updateLeaderCache(redirectPair.getLeft(), 
redirectPair.getRight());
+      }
+
       for (final Event event : events) {
         if (event instanceof EnrichedEvent) {
           ((EnrichedEvent) event)
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataRegionSyncConnector.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataRegionSyncConnector.java
index f873cc27646..568baa13144 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataRegionSyncConnector.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataRegionSyncConnector.java
@@ -19,11 +19,13 @@
 
 package org.apache.iotdb.db.pipe.connector.protocol.thrift.sync;
 
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.pipe.connector.client.IoTDBSyncClient;
 import 
org.apache.iotdb.commons.pipe.connector.payload.thrift.request.PipeTransferFilePieceReq;
 import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
-import 
org.apache.iotdb.db.pipe.connector.payload.evolvable.builder.IoTDBThriftSyncPipeTransferBatchReqBuilder;
+import 
org.apache.iotdb.db.pipe.connector.payload.evolvable.builder.PipeEventBatch;
+import 
org.apache.iotdb.db.pipe.connector.payload.evolvable.builder.PipeTransferBatchReqBuilder;
 import 
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletBinaryReq;
 import 
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletInsertNodeReq;
 import 
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletRawReq;
@@ -31,6 +33,7 @@ import 
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransfer
 import 
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTsFilePieceWithModReq;
 import 
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTsFileSealReq;
 import 
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTsFileSealWithModReq;
+import org.apache.iotdb.db.pipe.connector.util.LeaderCacheUtils;
 import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent;
 import 
org.apache.iotdb.db.pipe.event.common.schema.PipeSchemaRegionWritePlanEvent;
 import 
org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent;
@@ -53,12 +56,13 @@ import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.io.IOException;
+import java.util.Objects;
 
 public class IoTDBDataRegionSyncConnector extends IoTDBDataNodeSyncConnector {
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(IoTDBDataRegionSyncConnector.class);
 
-  private IoTDBThriftSyncPipeTransferBatchReqBuilder tabletBatchBuilder;
+  private PipeTransferBatchReqBuilder tabletBatchBuilder;
 
   @Override
   public void customize(
@@ -68,7 +72,7 @@ public class IoTDBDataRegionSyncConnector extends 
IoTDBDataNodeSyncConnector {
 
     // tablet batch mode configuration
     if (isTabletBatchModeEnabled) {
-      tabletBatchBuilder = new 
IoTDBThriftSyncPipeTransferBatchReqBuilder(parameters);
+      tabletBatchBuilder = new PipeTransferBatchReqBuilder(parameters);
     }
   }
 
@@ -99,8 +103,10 @@ public class IoTDBDataRegionSyncConnector extends 
IoTDBDataNodeSyncConnector {
 
     try {
       if (isTabletBatchModeEnabled) {
-        if (tabletBatchBuilder.onEvent(tabletInsertionEvent)) {
-          doTransfer();
+        final Pair<TEndPoint, PipeEventBatch> endPointAndBatch =
+            tabletBatchBuilder.onEvent(tabletInsertionEvent);
+        if (Objects.nonNull(endPointAndBatch)) {
+          doTransfer(endPointAndBatch);
         }
       } else {
         if (tabletInsertionEvent instanceof 
PipeInsertNodeTabletInsertionEvent) {
@@ -163,11 +169,14 @@ public class IoTDBDataRegionSyncConnector extends 
IoTDBDataNodeSyncConnector {
     }
   }
 
-  private void doTransfer() {
-    final Pair<IoTDBSyncClient, Boolean> clientAndStatus = 
clientManager.getClient();
+  private void doTransfer(Pair<TEndPoint, PipeEventBatch> endPointAndBatch) {
+    final Pair<IoTDBSyncClient, Boolean> clientAndStatus =
+        clientManager.getClient(endPointAndBatch.getLeft());
+    final PipeEventBatch batchToTransfer = endPointAndBatch.getRight();
+
     final TPipeTransferResp resp;
     try {
-      resp = 
clientAndStatus.getLeft().pipeTransfer(tabletBatchBuilder.toTPipeTransferReq());
+      resp = 
clientAndStatus.getLeft().pipeTransfer(batchToTransfer.toTPipeTransferReq());
     } catch (final Exception e) {
       clientAndStatus.setRight(false);
       throw new PipeConnectionException(
@@ -182,12 +191,21 @@ public class IoTDBDataRegionSyncConnector extends 
IoTDBDataNodeSyncConnector {
       receiverStatusHandler.handle(
           resp.getStatus(),
           String.format("Transfer PipeTransferTabletBatchReq error, result 
status %s", resp.status),
-          tabletBatchBuilder.deepCopyEvents().toString());
+          batchToTransfer.deepCopyEvents().toString());
     }
 
-    tabletBatchBuilder.decreaseEventsReferenceCount(
+    for (final Pair<String, TEndPoint> redirectPair :
+        LeaderCacheUtils.parseRecommendedRedirections(status)) {
+      clientManager.updateLeaderCache(redirectPair.getLeft(), 
redirectPair.getRight());
+    }
+
+    batchToTransfer.decreaseEventsReferenceCount(
         IoTDBDataRegionSyncConnector.class.getName(), true);
-    tabletBatchBuilder.onSuccess();
+    batchToTransfer.onSuccess();
+  }
+
+  private void doTransfer() {
+    tabletBatchBuilder.getAllNonEmptyBatches().forEach(this::doTransfer);
   }
 
   private void doTransferWrapper(
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/util/LeaderCacheUtils.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/util/LeaderCacheUtils.java
new file mode 100644
index 00000000000..e31fec9b57b
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/util/LeaderCacheUtils.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.connector.util;
+
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+import org.apache.tsfile.utils.Pair;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class LeaderCacheUtils {
+
+  private LeaderCacheUtils() {
+    // Do nothing
+  }
+
+  /**
+   * Get all redirection recommends after transferring a batch event to update 
leader cache.
+   *
+   * @param status is the returned status after transferring a batch event.
+   * @return a list of pairs, each pair contains a device path and its 
redirect endpoint.
+   */
+  public static List<Pair<String, TEndPoint>> 
parseRecommendedRedirections(TSStatus status) {
+    // If there is no exception, there should be 2 sub-statuses, one for 
InsertRowsStatement and one
+    // for InsertMultiTabletsStatement (see 
IoTDBDataNodeReceiver#handleTransferTabletBatch).
+    final List<Pair<String, TEndPoint>> redirectList = new ArrayList<>();
+
+    if (status.getSubStatusSize() != 2) {
+      return redirectList;
+    }
+
+    for (final TSStatus subStatus : status.getSubStatus()) {
+      if (subStatus.getCode() != 
TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) {
+        continue;
+      }
+
+      for (final TSStatus innerSubStatus : subStatus.getSubStatus()) {
+        if (innerSubStatus.isSetRedirectNode()) {
+          // We assume that innerSubStatus.getMessage() is a device path.
+          // The message field should be a device path.
+          redirectList.add(
+              new Pair<>(innerSubStatus.getMessage(), 
innerSubStatus.getRedirectNode()));
+        }
+      }
+    }
+
+    return redirectList;
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
index c9626c3ba81..c5f1d5fa99d 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
@@ -245,10 +245,10 @@ public class IoTDBDataNodeReceiver extends 
IoTDBFileReceiver {
             Stream.of(
                     statementPair.getLeft().isEmpty()
                         ? RpcUtils.SUCCESS_STATUS
-                        : 
executeStatementAndClassifyExceptions(statementPair.getLeft()),
+                        : 
executeStatementAndAddRedirectInfo(statementPair.getLeft()),
                     statementPair.getRight().isEmpty()
                         ? RpcUtils.SUCCESS_STATUS
-                        : 
executeStatementAndClassifyExceptions(statementPair.getRight()))
+                        : 
executeStatementAndAddRedirectInfo(statementPair.getRight()))
                 .collect(Collectors.toList())));
   }
 
@@ -353,10 +353,55 @@ public class IoTDBDataNodeReceiver extends 
IoTDBFileReceiver {
     return configReceiverId.get();
   }
 
+  /**
+   * For {@link InsertRowsStatement} and {@link InsertMultiTabletsStatement}, 
the returned {@link
+   * TSStatus} will use sub-status to record the endpoint for redirection. 
Each sub-status records
+   * the redirection endpoint for one device path, and the order is the same 
as the order of the
+   * device paths in the statement. However, this order is not guaranteed to 
be the same as in the
+   * request. So for each sub-status which needs to redirect, we record the 
device path using the
+   * message field.
+   */
+  private TSStatus executeStatementAndAddRedirectInfo(final 
InsertBaseStatement statement) {
+    final TSStatus result = executeStatementAndClassifyExceptions(statement);
+
+    if (result.getCode() == TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()
+        && result.getSubStatusSize() > 0) {
+      final List<PartialPath> devicePaths;
+      if (statement instanceof InsertRowsStatement) {
+        devicePaths = ((InsertRowsStatement) statement).getDevicePaths();
+      } else if (statement instanceof InsertMultiTabletsStatement) {
+        devicePaths = ((InsertMultiTabletsStatement) 
statement).getDevicePaths();
+      } else {
+        LOGGER.warn(
+            "Receiver id = {}: Unsupported statement type {} for redirection.",
+            receiverId.get(),
+            statement);
+        return result;
+      }
+
+      if (devicePaths.size() == result.getSubStatusSize()) {
+        for (int i = 0; i < devicePaths.size(); ++i) {
+          if (result.getSubStatus().get(i).isSetRedirectNode()) {
+            
result.getSubStatus().get(i).setMessage(devicePaths.get(i).getFullPath());
+          }
+        }
+      } else {
+        LOGGER.warn(
+            "Receiver id = {}: The number of device paths is not equal to 
sub-status in statement {}: {}.",
+            receiverId.get(),
+            statement,
+            result);
+      }
+    }
+
+    return result;
+  }
+
   private TSStatus executeStatementAndClassifyExceptions(final Statement 
statement) {
     try {
       final TSStatus result = executeStatement(statement);
-      if (result.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+      if (result.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
+          || result.getCode() == 
TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) {
         return result;
       } else {
         LOGGER.warn(
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertMultiTabletsStatement.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertMultiTabletsStatement.java
index eb4d2f56311..bbc00e1975b 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertMultiTabletsStatement.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertMultiTabletsStatement.java
@@ -49,6 +49,14 @@ public class InsertMultiTabletsStatement extends 
InsertBaseStatement {
     this.insertTabletStatementList = insertTabletStatementList;
   }
 
+  public List<PartialPath> getDevicePaths() {
+    List<PartialPath> partialPaths = new ArrayList<>();
+    for (InsertTabletStatement insertTabletStatement : 
insertTabletStatementList) {
+      partialPaths.add(insertTabletStatement.devicePath);
+    }
+    return partialPaths;
+  }
+
   public List<String[]> getMeasurementsList() {
     List<String[]> measurementsList = new ArrayList<>();
     for (InsertTabletStatement insertTabletStatement : 
insertTabletStatementList) {

Reply via email to