This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 23340404c99 [IOTDB-6080] Pipe: Introduce batch mode for tablet 
transferring (#10912)
23340404c99 is described below

commit 23340404c99422382925c2094d4f1b120406d9cb
Author: Caideyipi <[email protected]>
AuthorDate: Tue Aug 22 16:28:05 2023 +0800

    [IOTDB-6080] Pipe: Introduce batch mode for tablet transferring (#10912)
    
    Co-authored-by: xuanronaldo <[email protected]>
    Co-authored-by: Steve Yurong Su <[email protected]>
---
 .../apache/iotdb/pipe/it/IoTDBPipeDataSyncIT.java  |   1 +
 .../config/constant/PipeConnectorConstant.java     |  11 ++
 .../payload/evolvable/PipeRequestType.java         |   2 +
 ...oTDBThriftAsyncPipeTransferBatchReqBuilder.java |  98 ++++++++++++
 ...IoTDBThriftSyncPipeTransferBatchReqBuilder.java |  89 +++++++++++
 .../builder/PipeTransferBatchReqBuilder.java       |  64 ++++++++
 .../evolvable/request/PipeTransferBatchReq.java    | 172 +++++++++++++++++++++
 .../request/PipeTransferFilePieceReq.java          |   7 +-
 .../evolvable/request/PipeTransferFileSealReq.java |   4 +-
 .../request/PipeTransferHandshakeReq.java          |   2 +-
 .../request/PipeTransferInsertNodeReq.java         |   2 +-
 .../evolvable/request/PipeTransferTabletReq.java   |  37 ++++-
 .../db/pipe/connector/protocol/IoTDBConnector.java |  11 +-
 .../protocol/airgap/IoTDBAirGapConnector.java      |   8 +
 .../thrift/async/IoTDBThriftAsyncConnector.java    | 117 +++++++++++---
 ...eTransferTabletBatchInsertionEventHandler.java} |  63 ++++----
 .../PipeTransferTabletInsertionEventHandler.java   |   2 +-
 .../thrift/sync/IoTDBThriftSyncConnector.java      |  56 +++++--
 .../receiver/thrift/IoTDBThriftReceiverV1.java     |  93 +++++++----
 .../planner/plan/node/write/InsertRowsNode.java    |   9 +-
 20 files changed, 737 insertions(+), 111 deletions(-)

diff --git 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeDataSyncIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeDataSyncIT.java
index 0e48b93e614..60bbe095b4f 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeDataSyncIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeDataSyncIT.java
@@ -90,6 +90,7 @@ public class IoTDBPipeDataSyncIT {
       extractorAttributes.put("extractor.realtime.mode", "log");
 
       connectorAttributes.put("connector", "iotdb-thrift-connector");
+      connectorAttributes.put("connector.batch.enabled", "false");
       connectorAttributes.put("connector.ip", receiverIp);
       connectorAttributes.put("connector.port", 
Integer.toString(receiverPort));
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/config/constant/PipeConnectorConstant.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/config/constant/PipeConnectorConstant.java
index 96477c3a3aa..2f65a39f86c 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/config/constant/PipeConnectorConstant.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/config/constant/PipeConnectorConstant.java
@@ -19,6 +19,8 @@
 
 package org.apache.iotdb.db.pipe.config.constant;
 
+import static org.apache.iotdb.commons.conf.IoTDBConstant.MB;
+
 public class PipeConnectorConstant {
 
   public static final String CONNECTOR_KEY = "connector";
@@ -27,6 +29,15 @@ public class PipeConnectorConstant {
   public static final String CONNECTOR_IOTDB_PORT_KEY = "connector.port";
   public static final String CONNECTOR_IOTDB_NODE_URLS_KEY = 
"connector.node-urls";
 
+  public static final String CONNECTOR_IOTDB_BATCH_MODE_ENABLED_KEY = 
"connector.batch.enabled";
+  public static final boolean CONNECTOR_IOTDB_BATCH_MODE_ENABLED_DEFAULT_VALUE 
= true;
+
+  public static final String CONNECTOR_IOTDB_BATCH_DELAY_KEY = 
"connector.batch.max-delay-seconds";
+  public static final int CONNECTOR_IOTDB_BATCH_DELAY_DEFAULT_VALUE = 10;
+
+  public static final String CONNECTOR_IOTDB_BATCH_SIZE_KEY = 
"connector.batch.size-bytes";
+  public static final long CONNECTOR_IOTDB_BATCH_SIZE_DEFAULT_VALUE = 16 * MB;
+
   public static final String CONNECTOR_IOTDB_USER_KEY = "connector.user";
   public static final String CONNECTOR_IOTDB_USER_DEFAULT_VALUE = "root";
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/PipeRequestType.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/PipeRequestType.java
index 7d901518a7d..ed9c1384eac 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/PipeRequestType.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/PipeRequestType.java
@@ -31,6 +31,8 @@ public enum PipeRequestType {
 
   TRANSFER_FILE_PIECE((short) 4),
   TRANSFER_FILE_SEAL((short) 5),
+
+  TRANSFER_BATCH((short) 6),
   ;
 
   private final short type;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/builder/IoTDBThriftAsyncPipeTransferBatchReqBuilder.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/builder/IoTDBThriftAsyncPipeTransferBatchReqBuilder.java
new file mode 100644
index 00000000000..1d209adc887
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/builder/IoTDBThriftAsyncPipeTransferBatchReqBuilder.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.connector.payload.evolvable.builder;
+
+import 
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferInsertNodeReq;
+import 
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletReq;
+import org.apache.iotdb.db.pipe.event.EnrichedEvent;
+import 
org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent;
+import 
org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
+import 
org.apache.iotdb.db.storageengine.dataregion.wal.exception.WALPipeException;
+import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
+import org.apache.iotdb.pipe.api.event.Event;
+import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
+import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+public class IoTDBThriftAsyncPipeTransferBatchReqBuilder extends 
PipeTransferBatchReqBuilder {
+
+  protected final List<Long> requestCommitIds = new ArrayList<>();
+
+  public IoTDBThriftAsyncPipeTransferBatchReqBuilder(PipeParameters 
parameters) {
+    super(parameters);
+  }
+
+  /**
+   * Try offer event into cache if the given event is not duplicated.
+   *
+   * @param event the given event
+   * @return true if the batch can be transferred
+   */
+  public boolean onEvent(TabletInsertionEvent event, long requestCommitId)
+      throws IOException, WALPipeException {
+    final TPipeTransferReq req =
+        event instanceof PipeInsertNodeTabletInsertionEvent
+            ? PipeTransferInsertNodeReq.toTPipeTransferReq(
+                ((PipeInsertNodeTabletInsertionEvent) event).getInsertNode())
+            : PipeTransferTabletReq.toTPipeTransferReq(
+                ((PipeRawTabletInsertionEvent) event).convertToTablet(),
+                ((PipeRawTabletInsertionEvent) event).isAligned());
+    if (events.isEmpty() || !events.get(events.size() - 1).equals(event)) {
+      reqs.add(req);
+
+      if (event instanceof EnrichedEvent) {
+        ((EnrichedEvent) 
event).increaseReferenceCount(PipeTransferBatchReqBuilder.class.getName());
+      }
+      events.add(event);
+      requestCommitIds.add(requestCommitId);
+
+      if (firstEventProcessingTime == Long.MIN_VALUE) {
+        firstEventProcessingTime = System.currentTimeMillis();
+      }
+
+      bufferSize += req.getBody().length;
+    }
+
+    return bufferSize >= maxBatchSizeInBytes
+        || System.currentTimeMillis() - firstEventProcessingTime >= 
maxDelayInMs;
+  }
+
+  public void onSuccess() {
+    reqs.clear();
+
+    events.clear();
+    requestCommitIds.clear();
+
+    firstEventProcessingTime = Long.MIN_VALUE;
+
+    bufferSize = 0;
+  }
+
+  public List<Event> deepcopyEvents() {
+    return new ArrayList<>(events);
+  }
+
+  public List<Long> deepcopyRequestCommitIds() {
+    return new ArrayList<>(requestCommitIds);
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/builder/IoTDBThriftSyncPipeTransferBatchReqBuilder.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/builder/IoTDBThriftSyncPipeTransferBatchReqBuilder.java
new file mode 100644
index 00000000000..c60f8d241f3
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/builder/IoTDBThriftSyncPipeTransferBatchReqBuilder.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.connector.payload.evolvable.builder;
+
+import 
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferInsertNodeReq;
+import 
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletReq;
+import 
org.apache.iotdb.db.pipe.connector.protocol.thrift.sync.IoTDBThriftSyncConnector;
+import org.apache.iotdb.db.pipe.event.EnrichedEvent;
+import 
org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent;
+import 
org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
+import 
org.apache.iotdb.db.storageengine.dataregion.wal.exception.WALPipeException;
+import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
+import org.apache.iotdb.pipe.api.event.Event;
+import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
+import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
+
+import java.io.IOException;
+
+public class IoTDBThriftSyncPipeTransferBatchReqBuilder extends 
PipeTransferBatchReqBuilder {
+
+  public IoTDBThriftSyncPipeTransferBatchReqBuilder(PipeParameters parameters) 
{
+    super(parameters);
+  }
+
+  /**
+   * Try offer event into cache if the given event is not duplicated.
+   *
+   * @param event the given event
+   * @return true if the batch can be transferred
+   */
+  public boolean onEvent(TabletInsertionEvent event) throws IOException, 
WALPipeException {
+    final TPipeTransferReq req =
+        event instanceof PipeInsertNodeTabletInsertionEvent
+            ? PipeTransferInsertNodeReq.toTPipeTransferReq(
+                ((PipeInsertNodeTabletInsertionEvent) event).getInsertNode())
+            : PipeTransferTabletReq.toTPipeTransferReq(
+                ((PipeRawTabletInsertionEvent) event).convertToTablet(),
+                ((PipeRawTabletInsertionEvent) event).isAligned());
+    if (events.isEmpty() || !events.get(events.size() - 1).equals(event)) {
+      reqs.add(req);
+
+      if (event instanceof EnrichedEvent) {
+        ((EnrichedEvent) 
event).increaseReferenceCount(PipeTransferBatchReqBuilder.class.getName());
+      }
+      events.add(event);
+
+      if (firstEventProcessingTime == Long.MIN_VALUE) {
+        firstEventProcessingTime = System.currentTimeMillis();
+      }
+
+      bufferSize += req.getBody().length;
+    }
+
+    return bufferSize >= maxBatchSizeInBytes
+        || System.currentTimeMillis() - firstEventProcessingTime >= 
maxDelayInMs;
+  }
+
+  public void onSuccess() {
+    reqs.clear();
+
+    for (final Event event : events) {
+      if (event instanceof EnrichedEvent) {
+        ((EnrichedEvent) 
event).decreaseReferenceCount(IoTDBThriftSyncConnector.class.getName());
+      }
+    }
+    events.clear();
+
+    firstEventProcessingTime = Long.MIN_VALUE;
+
+    bufferSize = 0;
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/builder/PipeTransferBatchReqBuilder.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/builder/PipeTransferBatchReqBuilder.java
new file mode 100644
index 00000000000..a4bfec39bf9
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/builder/PipeTransferBatchReqBuilder.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.connector.payload.evolvable.builder;
+
+import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
+import org.apache.iotdb.pipe.api.event.Event;
+import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static 
org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_BATCH_DELAY_DEFAULT_VALUE;
+import static 
org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_BATCH_DELAY_KEY;
+import static 
org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_BATCH_SIZE_DEFAULT_VALUE;
+import static 
org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_BATCH_SIZE_KEY;
+
+public abstract class PipeTransferBatchReqBuilder {
+
+  protected final List<TPipeTransferReq> reqs = new ArrayList<>();
+  protected final List<Event> events = new ArrayList<>();
+
+  // limit in delayed time
+  protected final int maxDelayInMs;
+  protected long firstEventProcessingTime = Long.MIN_VALUE;
+
+  // limit in buffer size
+  protected final long maxBatchSizeInBytes;
+  protected long bufferSize = 0;
+
+  protected PipeTransferBatchReqBuilder(PipeParameters parameters) {
+    maxDelayInMs =
+        parameters.getIntOrDefault(
+                CONNECTOR_IOTDB_BATCH_DELAY_KEY, 
CONNECTOR_IOTDB_BATCH_DELAY_DEFAULT_VALUE)
+            * 1000;
+    maxBatchSizeInBytes =
+        parameters.getLongOrDefault(
+            CONNECTOR_IOTDB_BATCH_SIZE_KEY, 
CONNECTOR_IOTDB_BATCH_SIZE_DEFAULT_VALUE);
+  }
+
+  public List<TPipeTransferReq> getTPipeTransferReqs() {
+    return reqs;
+  }
+
+  public boolean isEmpty() {
+    return reqs.isEmpty();
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferBatchReq.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferBatchReq.java
new file mode 100644
index 00000000000..7574bc16895
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferBatchReq.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.connector.payload.evolvable.request;
+
+import org.apache.iotdb.db.pipe.connector.payload.evolvable.PipeRequestType;
+import 
org.apache.iotdb.db.pipe.connector.protocol.IoTDBConnectorRequestVersion;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.PlanFragment;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
+import org.apache.iotdb.db.queryengine.plan.statement.Statement;
+import 
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertMultiTabletsStatement;
+import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowStatement;
+import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowsStatement;
+import 
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement;
+import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
+import org.apache.iotdb.tsfile.utils.Pair;
+import org.apache.iotdb.tsfile.utils.PublicBAOS;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+import org.apache.iotdb.tsfile.write.record.Tablet;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+
+public class PipeTransferBatchReq extends TPipeTransferReq {
+
+  private final transient List<PipeTransferInsertNodeReq> insertNodeReqs = new 
ArrayList<>();
+  private final transient List<PipeTransferTabletReq> tabletReqs = new 
ArrayList<>();
+
+  private PipeTransferBatchReq() {
+    // Empty constructor
+  }
+
+  public Pair<InsertRowsStatement, InsertMultiTabletsStatement> 
constructStatements() {
+    final InsertRowsStatement insertRowsStatement = new InsertRowsStatement();
+    final InsertMultiTabletsStatement insertMultiTabletsStatement =
+        new InsertMultiTabletsStatement();
+
+    final List<InsertRowStatement> insertRowStatementList = new ArrayList<>();
+    final List<InsertTabletStatement> insertTabletStatementList = new 
ArrayList<>();
+
+    for (final PipeTransferInsertNodeReq insertNodeReq : insertNodeReqs) {
+      final Statement insertStatement = insertNodeReq.constructStatement();
+      if (insertStatement instanceof InsertRowStatement) {
+        insertRowStatementList.add((InsertRowStatement) insertStatement);
+      } else if (insertStatement instanceof InsertTabletStatement) {
+        insertTabletStatementList.add((InsertTabletStatement) insertStatement);
+      } else {
+        throw new UnsupportedOperationException(
+            String.format(
+                "unknown InsertBaseStatement %s constructed from the insert 
node request.",
+                insertNodeReq));
+      }
+    }
+
+    for (final PipeTransferTabletReq tabletReq : tabletReqs) {
+      insertTabletStatementList.add(tabletReq.constructStatement());
+    }
+
+    insertRowsStatement.setInsertRowStatementList(insertRowStatementList);
+    
insertMultiTabletsStatement.setInsertTabletStatementList(insertTabletStatementList);
+    return new Pair<>(insertRowsStatement, insertMultiTabletsStatement);
+  }
+
+  /////////////////////////////// Thrift ///////////////////////////////
+
+  public static PipeTransferBatchReq toTPipeTransferReq(List<TPipeTransferReq> 
reqs)
+      throws IOException {
+    final PipeTransferBatchReq batchReq = new PipeTransferBatchReq();
+
+    for (final TPipeTransferReq req : reqs) {
+      if (req instanceof PipeTransferInsertNodeReq) {
+        batchReq.insertNodeReqs.add((PipeTransferInsertNodeReq) req);
+      } else if (req instanceof PipeTransferTabletReq) {
+        batchReq.tabletReqs.add((PipeTransferTabletReq) req);
+      } else {
+        throw new UnsupportedOperationException(
+            String.format(
+                "unknown TPipeTransferReq type %s when constructing 
PipeTransferBatchReq",
+                req.getType()));
+      }
+    }
+
+    batchReq.version = IoTDBConnectorRequestVersion.VERSION_1.getVersion();
+    batchReq.type = PipeRequestType.TRANSFER_BATCH.getType();
+    try (final PublicBAOS byteArrayOutputStream = new PublicBAOS();
+        final DataOutputStream outputStream = new 
DataOutputStream(byteArrayOutputStream)) {
+      ReadWriteIOUtils.write(batchReq.insertNodeReqs.size(), outputStream);
+      for (final PipeTransferInsertNodeReq insertNodeReq : 
batchReq.insertNodeReqs) {
+        insertNodeReq.getInsertNode().serialize(outputStream);
+      }
+
+      ReadWriteIOUtils.write(batchReq.tabletReqs.size(), outputStream);
+      for (final PipeTransferTabletReq tabletReq : batchReq.tabletReqs) {
+        tabletReq.getTablet().serialize(outputStream);
+        ReadWriteIOUtils.write(tabletReq.getIsAligned(), outputStream);
+      }
+
+      batchReq.body =
+          ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0, 
byteArrayOutputStream.size());
+    }
+
+    return batchReq;
+  }
+
+  public static PipeTransferBatchReq fromTPipeTransferReq(TPipeTransferReq 
transferReq)
+      throws IOException {
+    final PipeTransferBatchReq batchReq = new PipeTransferBatchReq();
+
+    int size = ReadWriteIOUtils.readInt(transferReq.body);
+    for (int i = 0; i < size; ++i) {
+      batchReq.insertNodeReqs.add(
+          PipeTransferInsertNodeReq.toTPipeTransferReq(
+              (InsertNode) PlanFragment.deserializeHelper(transferReq.body)));
+    }
+
+    size = ReadWriteIOUtils.readInt(transferReq.body);
+    for (int i = 0; i < size; ++i) {
+      batchReq.tabletReqs.add(
+          PipeTransferTabletReq.toTPipeTransferReq(
+              Tablet.deserialize(transferReq.body), 
ReadWriteIOUtils.readBool(transferReq.body)));
+    }
+
+    batchReq.version = transferReq.version;
+    batchReq.type = transferReq.type;
+    batchReq.body = transferReq.body;
+
+    return batchReq;
+  }
+
+  /////////////////////////////// Object ///////////////////////////////
+
+  @Override
+  public boolean equals(Object obj) {
+    if (this == obj) {
+      return true;
+    }
+    if (obj == null || getClass() != obj.getClass()) {
+      return false;
+    }
+    PipeTransferBatchReq that = (PipeTransferBatchReq) obj;
+    return insertNodeReqs.equals(that.insertNodeReqs)
+        && tabletReqs.equals(that.tabletReqs)
+        && version == that.version
+        && type == that.type
+        && body.equals(that.body);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(insertNodeReqs, tabletReqs, version, type, body);
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferFilePieceReq.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferFilePieceReq.java
index 12f7843d906..1b9bf9bd1b2 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferFilePieceReq.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferFilePieceReq.java
@@ -34,9 +34,9 @@ import java.util.Objects;
 
 public class PipeTransferFilePieceReq extends TPipeTransferReq {
 
-  private String fileName;
-  private long startWritingOffset;
-  private byte[] filePiece;
+  private transient String fileName;
+  private transient long startWritingOffset;
+  private transient byte[] filePiece;
 
   private PipeTransferFilePieceReq() {
     // Empty constructor
@@ -93,6 +93,7 @@ public class PipeTransferFilePieceReq extends 
TPipeTransferReq {
   }
 
   /////////////////////////////// Air Gap ///////////////////////////////
+
   public static byte[] toTPipeTransferBytes(
       String fileName, long startWritingOffset, byte[] filePiece) throws 
IOException {
     try (final PublicBAOS byteArrayOutputStream = new PublicBAOS();
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferFileSealReq.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferFileSealReq.java
index e9dc715bdd1..6314068fe50 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferFileSealReq.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferFileSealReq.java
@@ -32,8 +32,8 @@ import java.util.Objects;
 
 public class PipeTransferFileSealReq extends TPipeTransferReq {
 
-  private String fileName;
-  private long fileLength;
+  private transient String fileName;
+  private transient long fileLength;
 
   private PipeTransferFileSealReq() {
     // Empty constructor
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferHandshakeReq.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferHandshakeReq.java
index 11d6ce7ef4b..435726bfcf1 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferHandshakeReq.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferHandshakeReq.java
@@ -32,7 +32,7 @@ import java.util.Objects;
 
 public class PipeTransferHandshakeReq extends TPipeTransferReq {
 
-  private String timestampPrecision;
+  private transient String timestampPrecision;
 
   private PipeTransferHandshakeReq() {
     // Empty constructor
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferInsertNodeReq.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferInsertNodeReq.java
index c3da0985b45..f2bdd0db906 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferInsertNodeReq.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferInsertNodeReq.java
@@ -39,7 +39,7 @@ import java.util.Objects;
 
 public class PipeTransferInsertNodeReq extends TPipeTransferReq {
 
-  private InsertNode insertNode;
+  private transient InsertNode insertNode;
 
   private PipeTransferInsertNodeReq() {
     // Do nothing
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferTabletReq.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferTabletReq.java
index 58e64520133..8be16edda7b 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferTabletReq.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferTabletReq.java
@@ -45,13 +45,22 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.Arrays;
 import java.util.Comparator;
+import java.util.Objects;
 
 public class PipeTransferTabletReq extends TPipeTransferReq {
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(PipeTransferTabletReq.class);
 
-  private Tablet tablet;
-  private boolean isAligned;
+  private transient Tablet tablet;
+  private transient boolean isAligned;
+
+  public Tablet getTablet() {
+    return tablet;
+  }
+
+  public boolean getIsAligned() {
+    return isAligned;
+  }
 
   public InsertTabletStatement constructStatement() {
     if (!checkSorted(tablet)) {
@@ -226,6 +235,7 @@ public class PipeTransferTabletReq extends TPipeTransferReq 
{
   }
 
   /////////////////////////////// Air Gap ///////////////////////////////
+
   public static byte[] toTPipeTransferTabletBytes(Tablet tablet, boolean 
isAligned)
       throws IOException {
     try (final PublicBAOS byteArrayOutputStream = new PublicBAOS();
@@ -237,4 +247,27 @@ public class PipeTransferTabletReq extends 
TPipeTransferReq {
       return byteArrayOutputStream.toByteArray();
     }
   }
+
+  /////////////////////////////// Object ///////////////////////////////
+
+  @Override
+  public boolean equals(Object obj) {
+    if (this == obj) {
+      return true;
+    }
+    if (obj == null || getClass() != obj.getClass()) {
+      return false;
+    }
+    PipeTransferTabletReq that = (PipeTransferTabletReq) obj;
+    return tablet.equals(that.tablet)
+        && isAligned == that.isAligned
+        && version == that.version
+        && type == that.type
+        && body.equals(that.body);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(tablet, isAligned, version, type, body);
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/IoTDBConnector.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/IoTDBConnector.java
index 3b218c60e04..8de5706a958 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/IoTDBConnector.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/IoTDBConnector.java
@@ -35,6 +35,8 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 
+import static 
org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_BATCH_MODE_ENABLED_DEFAULT_VALUE;
+import static 
org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_BATCH_MODE_ENABLED_KEY;
 import static 
org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_IP_KEY;
 import static 
org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_NODE_URLS_KEY;
 import static 
org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_PORT_KEY;
@@ -45,6 +47,8 @@ public abstract class IoTDBConnector implements PipeConnector 
{
 
   protected final List<TEndPoint> nodeUrls = new ArrayList<>();
 
+  protected boolean isTabletBatchModeEnabled = true;
+
   @Override
   public void validate(PipeParameterValidator validator) throws Exception {
     final PipeParameters parameters = validator.getParameters();
@@ -79,7 +83,12 @@ public abstract class IoTDBConnector implements 
PipeConnector {
 
     nodeUrls.clear();
     nodeUrls.addAll(givenNodeUrls);
+    LOGGER.info("IoTDBConnector nodeUrls: {}", nodeUrls);
 
-    LOGGER.info("IoTDBThriftConnector nodeUrls: {}", nodeUrls);
+    isTabletBatchModeEnabled =
+        parameters.getBooleanOrDefault(
+            CONNECTOR_IOTDB_BATCH_MODE_ENABLED_KEY,
+            CONNECTOR_IOTDB_BATCH_MODE_ENABLED_DEFAULT_VALUE);
+    LOGGER.info("IoTDBConnector isTabletBatchModeEnabled: {}", 
isTabletBatchModeEnabled);
   }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBAirGapConnector.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBAirGapConnector.java
index 8a0e11f5ebf..9830e44c9e7 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBAirGapConnector.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBAirGapConnector.java
@@ -76,6 +76,14 @@ public class IoTDBAirGapConnector extends IoTDBConnector {
   public void customize(PipeParameters parameters, 
PipeConnectorRuntimeConfiguration configuration)
       throws Exception {
     super.customize(parameters, configuration);
+
+    if (isTabletBatchModeEnabled) {
+      LOGGER.warn(
+          "Batch mode is enabled by the given parameters. "
+              + "IoTDBAirGapConnector does not support batch mode. "
+              + "Disable batch mode.");
+    }
+
     for (int i = 0; i < nodeUrls.size(); i++) {
       isSocketAlive.add(false);
       sockets.add(null);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBThriftAsyncConnector.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBThriftAsyncConnector.java
index a7ee768226b..b588a0b168d 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBThriftAsyncConnector.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBThriftAsyncConnector.java
@@ -27,12 +27,14 @@ import 
org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
 import org.apache.iotdb.commons.concurrent.ThreadName;
 import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
 import org.apache.iotdb.commons.conf.CommonDescriptor;
+import 
org.apache.iotdb.db.pipe.connector.payload.evolvable.builder.IoTDBThriftAsyncPipeTransferBatchReqBuilder;
 import 
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferHandshakeReq;
 import 
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferInsertNodeReq;
 import 
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletReq;
 import org.apache.iotdb.db.pipe.connector.protocol.IoTDBConnector;
 import 
org.apache.iotdb.db.pipe.connector.protocol.thrift.async.handler.PipeTransferInsertNodeTabletInsertionEventHandler;
 import 
org.apache.iotdb.db.pipe.connector.protocol.thrift.async.handler.PipeTransferRawTabletInsertionEventHandler;
+import 
org.apache.iotdb.db.pipe.connector.protocol.thrift.async.handler.PipeTransferTabletBatchInsertionEventHandler;
 import 
org.apache.iotdb.db.pipe.connector.protocol.thrift.async.handler.PipeTransferTsFileInsertionEventHandler;
 import 
org.apache.iotdb.db.pipe.connector.protocol.thrift.sync.IoTDBThriftSyncConnector;
 import org.apache.iotdb.db.pipe.event.EnrichedEvent;
@@ -59,6 +61,7 @@ import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nullable;
 
+import java.io.IOException;
 import java.util.Comparator;
 import java.util.Optional;
 import java.util.PriorityQueue;
@@ -96,6 +99,8 @@ public class IoTDBThriftAsyncConnector extends IoTDBConnector 
{
   private final PriorityQueue<Pair<Long, Runnable>> commitQueue =
       new PriorityQueue<>(Comparator.comparing(o -> o.left));
 
+  private IoTDBThriftAsyncPipeTransferBatchReqBuilder tabletBatchBuilder;
+
   public IoTDBThriftAsyncConnector() {
     if (ASYNC_PIPE_DATA_TRANSFER_CLIENT_MANAGER_HOLDER.get() == null) {
       synchronized (IoTDBThriftAsyncConnector.class) {
@@ -120,11 +125,16 @@ public class IoTDBThriftAsyncConnector extends 
IoTDBConnector {
   public void customize(PipeParameters parameters, 
PipeConnectorRuntimeConfiguration configuration)
       throws Exception {
     super.customize(parameters, configuration);
+
     retryConnector.customize(parameters, configuration);
+
+    if (isTabletBatchModeEnabled) {
+      tabletBatchBuilder = new 
IoTDBThriftAsyncPipeTransferBatchReqBuilder(parameters);
+    }
   }
 
   @Override
-  // synchronized to avoid close connector when transfer event
+  // Synchronized to avoid close connector when transfer event
   public synchronized void handshake() throws Exception {
     retryConnector.handshake();
   }
@@ -149,29 +159,70 @@ public class IoTDBThriftAsyncConnector extends 
IoTDBConnector {
 
     final long requestCommitId = commitIdGenerator.incrementAndGet();
 
-    if (tabletInsertionEvent instanceof PipeInsertNodeTabletInsertionEvent) {
-      final PipeInsertNodeTabletInsertionEvent 
pipeInsertNodeTabletInsertionEvent =
-          (PipeInsertNodeTabletInsertionEvent) tabletInsertionEvent;
-      final PipeTransferInsertNodeReq pipeTransferInsertNodeReq =
-          PipeTransferInsertNodeReq.toTPipeTransferReq(
-              pipeInsertNodeTabletInsertionEvent.getInsertNode());
-      final PipeTransferInsertNodeTabletInsertionEventHandler 
pipeTransferInsertNodeReqHandler =
-          new PipeTransferInsertNodeTabletInsertionEventHandler(
-              requestCommitId, pipeInsertNodeTabletInsertionEvent, 
pipeTransferInsertNodeReq, this);
-
-      transfer(requestCommitId, pipeTransferInsertNodeReqHandler);
-    } else { // tabletInsertionEvent instanceof PipeRawTabletInsertionEvent
-      final PipeRawTabletInsertionEvent pipeRawTabletInsertionEvent =
-          (PipeRawTabletInsertionEvent) tabletInsertionEvent;
-      final PipeTransferTabletReq pipeTransferTabletReq =
-          PipeTransferTabletReq.toTPipeTransferReq(
-              pipeRawTabletInsertionEvent.convertToTablet(),
-              pipeRawTabletInsertionEvent.isAligned());
-      final PipeTransferRawTabletInsertionEventHandler 
pipeTransferTabletReqHandler =
-          new PipeTransferRawTabletInsertionEventHandler(
-              requestCommitId, pipeRawTabletInsertionEvent, 
pipeTransferTabletReq, this);
-
-      transfer(requestCommitId, pipeTransferTabletReqHandler);
+    if (isTabletBatchModeEnabled) {
+      if (tabletBatchBuilder.onEvent(tabletInsertionEvent, requestCommitId)) {
+        final PipeTransferTabletBatchInsertionEventHandler
+            pipeTransferTabletBatchInsertionEventHandler =
+                new 
PipeTransferTabletBatchInsertionEventHandler(tabletBatchBuilder, this);
+
+        transfer(requestCommitId, 
pipeTransferTabletBatchInsertionEventHandler);
+
+        tabletBatchBuilder.onSuccess();
+      }
+    } else {
+      if (tabletInsertionEvent instanceof PipeInsertNodeTabletInsertionEvent) {
+        final PipeInsertNodeTabletInsertionEvent 
pipeInsertNodeTabletInsertionEvent =
+            (PipeInsertNodeTabletInsertionEvent) tabletInsertionEvent;
+        final PipeTransferInsertNodeReq pipeTransferInsertNodeReq =
+            PipeTransferInsertNodeReq.toTPipeTransferReq(
+                pipeInsertNodeTabletInsertionEvent.getInsertNode());
+        final PipeTransferInsertNodeTabletInsertionEventHandler 
pipeTransferInsertNodeReqHandler =
+            new PipeTransferInsertNodeTabletInsertionEventHandler(
+                requestCommitId,
+                pipeInsertNodeTabletInsertionEvent,
+                pipeTransferInsertNodeReq,
+                this);
+
+        transfer(requestCommitId, pipeTransferInsertNodeReqHandler);
+      } else { // tabletInsertionEvent instanceof PipeRawTabletInsertionEvent
+        final PipeRawTabletInsertionEvent pipeRawTabletInsertionEvent =
+            (PipeRawTabletInsertionEvent) tabletInsertionEvent;
+        final PipeTransferTabletReq pipeTransferTabletReq =
+            PipeTransferTabletReq.toTPipeTransferReq(
+                pipeRawTabletInsertionEvent.convertToTablet(),
+                pipeRawTabletInsertionEvent.isAligned());
+        final PipeTransferRawTabletInsertionEventHandler 
pipeTransferTabletReqHandler =
+            new PipeTransferRawTabletInsertionEventHandler(
+                requestCommitId, pipeRawTabletInsertionEvent, 
pipeTransferTabletReq, this);
+
+        transfer(requestCommitId, pipeTransferTabletReqHandler);
+      }
+    }
+  }
+
+  private void transfer(
+      long requestCommitId,
+      PipeTransferTabletBatchInsertionEventHandler 
pipeTransferTabletBatchInsertionEventHandler) {
+    final TEndPoint targetNodeUrl = nodeUrls.get((int) (requestCommitId % 
nodeUrls.size()));
+
+    try {
+      final AsyncPipeDataTransferServiceClient client = 
borrowClient(targetNodeUrl);
+
+      try {
+        pipeTransferTabletBatchInsertionEventHandler.transfer(client);
+      } catch (TException e) {
+        LOGGER.warn(
+            String.format(
+                "Transfer batched insertion requests to receiver %s:%s error, 
retrying...",
+                targetNodeUrl.getIp(), targetNodeUrl.getPort()),
+            e);
+      }
+    } catch (Exception ex) {
+      pipeTransferTabletBatchInsertionEventHandler.onError(ex);
+      LOGGER.warn(
+          String.format(
+              FAILED_TO_BORROW_CLIENT_FORMATTER, targetNodeUrl.getIp(), 
targetNodeUrl.getPort()),
+          ex);
     }
   }
 
@@ -230,6 +281,7 @@ public class IoTDBThriftAsyncConnector extends 
IoTDBConnector {
   @Override
   public void transfer(TsFileInsertionEvent tsFileInsertionEvent) throws 
Exception {
     transferQueuedEventsIfNecessary();
+    transferBatchedEventsIfNecessary();
 
     if (!(tsFileInsertionEvent instanceof PipeTsFileInsertionEvent)) {
       LOGGER.warn(
@@ -279,6 +331,7 @@ public class IoTDBThriftAsyncConnector extends 
IoTDBConnector {
   @Override
   public void transfer(Event event) throws Exception {
     transferQueuedEventsIfNecessary();
+    transferBatchedEventsIfNecessary();
 
     LOGGER.warn("IoTDBThriftAsyncConnector does not support transfer generic 
event: {}.", event);
   }
@@ -414,6 +467,22 @@ public class IoTDBThriftAsyncConnector extends 
IoTDBConnector {
     }
   }
 
+  /** Try its best to commit data in order. Flush can also be a trigger to 
transfer batched data. */
+  private void transferBatchedEventsIfNecessary() throws IOException {
+    if (!isTabletBatchModeEnabled || tabletBatchBuilder.isEmpty()) {
+      return;
+    }
+
+    final long requestCommitId = commitIdGenerator.incrementAndGet();
+    final PipeTransferTabletBatchInsertionEventHandler
+        pipeTransferTabletBatchInsertionEventHandler =
+            new 
PipeTransferTabletBatchInsertionEventHandler(tabletBatchBuilder, this);
+
+    transfer(requestCommitId, pipeTransferTabletBatchInsertionEventHandler);
+
+    tabletBatchBuilder.onSuccess();
+  }
+
   /**
    * Commit the event. Decrease the reference count of the event. If the 
reference count is 0, the
    * progress index of the event will be recalculated and the resources of the 
event will be
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletInsertionEventHandler.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletBatchInsertionEventHandler.java
similarity index 62%
copy from 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletInsertionEventHandler.java
copy to 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletBatchInsertionEventHandler.java
index 85c88957d3d..dc183d762b3 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletInsertionEventHandler.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletBatchInsertionEventHandler.java
@@ -20,6 +20,8 @@
 package org.apache.iotdb.db.pipe.connector.protocol.thrift.async.handler;
 
 import 
org.apache.iotdb.commons.client.async.AsyncPipeDataTransferServiceClient;
+import 
org.apache.iotdb.db.pipe.connector.payload.evolvable.builder.IoTDBThriftAsyncPipeTransferBatchReqBuilder;
+import 
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferBatchReq;
 import 
org.apache.iotdb.db.pipe.connector.protocol.thrift.async.IoTDBThriftAsyncConnector;
 import org.apache.iotdb.db.pipe.event.EnrichedEvent;
 import org.apache.iotdb.pipe.api.event.Event;
@@ -33,59 +35,50 @@ import org.apache.thrift.async.AsyncMethodCallback;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.Optional;
+import java.io.IOException;
+import java.util.List;
 
-public abstract class PipeTransferTabletInsertionEventHandler<E extends 
TPipeTransferResp>
-    implements AsyncMethodCallback<E> {
+public class PipeTransferTabletBatchInsertionEventHandler
+    implements AsyncMethodCallback<TPipeTransferResp> {
 
   private static final Logger LOGGER =
-      LoggerFactory.getLogger(PipeTransferTabletInsertionEventHandler.class);
+      
LoggerFactory.getLogger(PipeTransferTabletBatchInsertionEventHandler.class);
 
-  private final long requestCommitId;
-  private final Event event;
+  private final List<Long> requestCommitIds;
+  private final List<Event> events;
   private final TPipeTransferReq req;
 
   private final IoTDBThriftAsyncConnector connector;
 
-  protected PipeTransferTabletInsertionEventHandler(
-      long requestCommitId,
-      Event event,
-      TPipeTransferReq req,
-      IoTDBThriftAsyncConnector connector) {
-    this.requestCommitId = requestCommitId;
-    this.event = event;
-    this.req = req;
-    this.connector = connector;
+  public PipeTransferTabletBatchInsertionEventHandler(
+      IoTDBThriftAsyncPipeTransferBatchReqBuilder batchBuilder, 
IoTDBThriftAsyncConnector connector)
+      throws IOException {
+    // Deep copy to keep Ids' and events' reference
+    requestCommitIds = batchBuilder.deepcopyRequestCommitIds();
+    events = batchBuilder.deepcopyEvents();
+    req = 
PipeTransferBatchReq.toTPipeTransferReq(batchBuilder.getTPipeTransferReqs());
 
-    Optional.ofNullable(event)
-        .ifPresent(
-            e -> {
-              if (e instanceof EnrichedEvent) {
-                ((EnrichedEvent) e)
-                    .increaseReferenceCount(
-                        
PipeTransferTabletInsertionEventHandler.class.getName());
-              }
-            });
+    this.connector = connector;
   }
 
   public void transfer(AsyncPipeDataTransferServiceClient client) throws 
TException {
-    doTransfer(client, req);
+    client.pipeTransfer(req, this);
   }
 
-  protected abstract void doTransfer(
-      AsyncPipeDataTransferServiceClient client, TPipeTransferReq req) throws 
TException;
-
   @Override
   public void onComplete(TPipeTransferResp response) {
-    // just in case
+    // Just in case
     if (response == null) {
       onError(new PipeException("TPipeTransferResp is null"));
       return;
     }
 
     if (response.getStatus().getCode() == 
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
-      connector.commit(
-          requestCommitId, event instanceof EnrichedEvent ? (EnrichedEvent) 
event : null);
+      for (int i = 0; i < events.size(); ++i) {
+        connector.commit(
+            requestCommitIds.get(i),
+            events.get(i) instanceof EnrichedEvent ? (EnrichedEvent) 
events.get(i) : null);
+      }
     } else {
       onError(new PipeException(response.getStatus().getMessage()));
     }
@@ -95,10 +88,12 @@ public abstract class 
PipeTransferTabletInsertionEventHandler<E extends TPipeTra
   public void onError(Exception exception) {
     LOGGER.warn(
         "Failed to transfer TabletInsertionEvent {} (requestCommitId={}).",
-        event,
-        requestCommitId,
+        events,
+        requestCommitIds,
         exception);
 
-    connector.addFailureEventToRetryQueue(requestCommitId, event);
+    for (int i = 0; i < events.size(); ++i) {
+      connector.addFailureEventToRetryQueue(requestCommitIds.get(i), 
events.get(i));
+    }
   }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletInsertionEventHandler.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletInsertionEventHandler.java
index 85c88957d3d..c76b81011e3 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletInsertionEventHandler.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletInsertionEventHandler.java
@@ -77,7 +77,7 @@ public abstract class 
PipeTransferTabletInsertionEventHandler<E extends TPipeTra
 
   @Override
   public void onComplete(TPipeTransferResp response) {
-    // just in case
+    // Just in case
     if (response == null) {
       onError(new PipeException("TPipeTransferResp is null"));
       return;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBThriftSyncConnector.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBThriftSyncConnector.java
index e682247cc66..c4764219859 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBThriftSyncConnector.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBThriftSyncConnector.java
@@ -22,7 +22,9 @@ package 
org.apache.iotdb.db.pipe.connector.protocol.thrift.sync;
 import org.apache.iotdb.commons.client.property.ThriftClientProperty;
 import org.apache.iotdb.commons.conf.CommonDescriptor;
 import org.apache.iotdb.commons.pipe.config.PipeConfig;
+import 
org.apache.iotdb.db.pipe.connector.payload.evolvable.builder.IoTDBThriftSyncPipeTransferBatchReqBuilder;
 import 
org.apache.iotdb.db.pipe.connector.payload.evolvable.reponse.PipeTransferFilePieceResp;
+import 
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferBatchReq;
 import 
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferFilePieceReq;
 import 
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferFileSealReq;
 import 
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferHandshakeReq;
@@ -65,6 +67,8 @@ public class IoTDBThriftSyncConnector extends IoTDBConnector {
 
   private long currentClientIndex = 0;
 
+  private IoTDBThriftSyncPipeTransferBatchReqBuilder tabletBatchBuilder;
+
   public IoTDBThriftSyncConnector() {
     // Do nothing
   }
@@ -73,10 +77,15 @@ public class IoTDBThriftSyncConnector extends 
IoTDBConnector {
   public void customize(PipeParameters parameters, 
PipeConnectorRuntimeConfiguration configuration)
       throws Exception {
     super.customize(parameters, configuration);
+
     for (int i = 0; i < nodeUrls.size(); i++) {
       isClientAlive.add(false);
       clients.add(null);
     }
+
+    if (isTabletBatchModeEnabled) {
+      tabletBatchBuilder = new 
IoTDBThriftSyncPipeTransferBatchReqBuilder(parameters);
+    }
   }
 
   @Override
@@ -89,7 +98,7 @@ public class IoTDBThriftSyncConnector extends IoTDBConnector {
       final String ip = nodeUrls.get(i).getIp();
       final int port = nodeUrls.get(i).getPort();
 
-      // close the client if necessary
+      // Close the client if necessary
       if (clients.get(i) != null) {
         try {
           clients.set(i, null).close();
@@ -163,21 +172,30 @@ public class IoTDBThriftSyncConnector extends 
IoTDBConnector {
   @Override
   public void transfer(TabletInsertionEvent tabletInsertionEvent) throws 
Exception {
     // PipeProcessor can change the type of TabletInsertionEvent
+    if (!(tabletInsertionEvent instanceof PipeInsertNodeTabletInsertionEvent)
+        && !(tabletInsertionEvent instanceof PipeRawTabletInsertionEvent)) {
+      LOGGER.warn(
+          "IoTDBThriftSyncConnector only support "
+              + "PipeInsertNodeTabletInsertionEvent and 
PipeRawTabletInsertionEvent. "
+              + "Ignore {}.",
+          tabletInsertionEvent);
+      return;
+    }
 
     final int clientIndex = nextClientIndex();
     final IoTDBThriftSyncConnectorClient client = clients.get(clientIndex);
 
     try {
-      if (tabletInsertionEvent instanceof PipeInsertNodeTabletInsertionEvent) {
-        doTransfer(client, (PipeInsertNodeTabletInsertionEvent) 
tabletInsertionEvent);
-      } else if (tabletInsertionEvent instanceof PipeRawTabletInsertionEvent) {
-        doTransfer(client, (PipeRawTabletInsertionEvent) tabletInsertionEvent);
+      if (isTabletBatchModeEnabled) {
+        if (tabletBatchBuilder.onEvent(tabletInsertionEvent)) {
+          doTransfer(client);
+        }
       } else {
-        LOGGER.warn(
-            "IoTDBThriftSyncConnector only support "
-                + "PipeInsertNodeTabletInsertionEvent and 
PipeRawTabletInsertionEvent. "
-                + "Ignore {}.",
-            tabletInsertionEvent);
+        if (tabletInsertionEvent instanceof 
PipeInsertNodeTabletInsertionEvent) {
+          doTransfer(client, (PipeInsertNodeTabletInsertionEvent) 
tabletInsertionEvent);
+        } else {
+          doTransfer(client, (PipeRawTabletInsertionEvent) 
tabletInsertionEvent);
+        }
       }
     } catch (TException e) {
       isClientAlive.set(clientIndex, false);
@@ -204,6 +222,11 @@ public class IoTDBThriftSyncConnector extends 
IoTDBConnector {
     final IoTDBThriftSyncConnectorClient client = clients.get(clientIndex);
 
     try {
+      // in order to commit in order
+      if (isTabletBatchModeEnabled && !tabletBatchBuilder.isEmpty()) {
+        doTransfer(client);
+      }
+
       doTransfer(client, (PipeTsFileInsertionEvent) tsFileInsertionEvent);
     } catch (TException e) {
       isClientAlive.set(clientIndex, false);
@@ -221,6 +244,19 @@ public class IoTDBThriftSyncConnector extends 
IoTDBConnector {
     LOGGER.warn("IoTDBThriftSyncConnector does not support transfer generic 
event: {}.", event);
   }
 
+  private void doTransfer(IoTDBThriftSyncConnectorClient client) throws 
IOException, TException {
+    final TPipeTransferResp resp =
+        client.pipeTransfer(
+            
PipeTransferBatchReq.toTPipeTransferReq(tabletBatchBuilder.getTPipeTransferReqs()));
+
+    if (resp.getStatus().getCode() != 
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+      throw new PipeException(
+          String.format("Transfer PipeTransferBatchReq error, result status 
%s", resp.status));
+    }
+
+    tabletBatchBuilder.onSuccess();
+  }
+
   private void doTransfer(
       IoTDBThriftSyncConnectorClient client,
       PipeInsertNodeTabletInsertionEvent pipeInsertNodeTabletInsertionEvent)
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/thrift/IoTDBThriftReceiverV1.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/thrift/IoTDBThriftReceiverV1.java
index 09d6870f489..454a5dc8faa 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/thrift/IoTDBThriftReceiverV1.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/thrift/IoTDBThriftReceiverV1.java
@@ -26,6 +26,7 @@ import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import 
org.apache.iotdb.db.pipe.connector.payload.airgap.AirGapPseudoTPipeTransferRequest;
 import org.apache.iotdb.db.pipe.connector.payload.evolvable.PipeRequestType;
 import 
org.apache.iotdb.db.pipe.connector.payload.evolvable.reponse.PipeTransferFilePieceResp;
+import 
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferBatchReq;
 import 
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferFilePieceReq;
 import 
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferFileSealReq;
 import 
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferHandshakeReq;
@@ -39,6 +40,8 @@ import 
org.apache.iotdb.db.queryengine.plan.analyze.schema.ISchemaFetcher;
 import org.apache.iotdb.db.queryengine.plan.execution.ExecutionResult;
 import org.apache.iotdb.db.queryengine.plan.statement.Statement;
 import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertBaseStatement;
+import 
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertMultiTabletsStatement;
+import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowsStatement;
 import 
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement;
 import org.apache.iotdb.db.queryengine.plan.statement.crud.LoadTsFileStatement;
 import 
org.apache.iotdb.db.queryengine.plan.statement.crud.PipeEnrichedInsertBaseStatement;
@@ -47,6 +50,7 @@ import org.apache.iotdb.rpc.RpcUtils;
 import org.apache.iotdb.rpc.TSStatusCode;
 import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
 import org.apache.iotdb.service.rpc.thrift.TPipeTransferResp;
+import org.apache.iotdb.tsfile.utils.Pair;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -57,6 +61,8 @@ import java.io.RandomAccessFile;
 import java.nio.file.Files;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 public class IoTDBThriftReceiverV1 implements IoTDBThriftReceiver {
 
@@ -76,37 +82,48 @@ public class IoTDBThriftReceiverV1 implements 
IoTDBThriftReceiver {
   @Override
   public synchronized TPipeTransferResp receive(
       TPipeTransferReq req, IPartitionFetcher partitionFetcher, ISchemaFetcher 
schemaFetcher) {
-    final short rawRequestType = req.getType();
-    if (PipeRequestType.isValidatedRequestType(rawRequestType)) {
-      switch (PipeRequestType.valueOf(rawRequestType)) {
-        case HANDSHAKE:
-          return 
handleTransferHandshake(PipeTransferHandshakeReq.fromTPipeTransferReq(req));
-        case TRANSFER_INSERT_NODE:
-          return handleTransferInsertNode(
-              PipeTransferInsertNodeReq.fromTPipeTransferReq(req), 
partitionFetcher, schemaFetcher);
-        case TRANSFER_TABLET:
-          return handleTransferTablet(
-              PipeTransferTabletReq.fromTPipeTransferReq(req), 
partitionFetcher, schemaFetcher);
-        case TRANSFER_FILE_PIECE:
-          return handleTransferFilePiece(
-              PipeTransferFilePieceReq.fromTPipeTransferReq(req),
-              req instanceof AirGapPseudoTPipeTransferRequest);
-        case TRANSFER_FILE_SEAL:
-          return handleTransferFileSeal(
-              PipeTransferFileSealReq.fromTPipeTransferReq(req), 
partitionFetcher, schemaFetcher);
-        default:
-          break;
+    try {
+      final short rawRequestType = req.getType();
+      if (PipeRequestType.isValidatedRequestType(rawRequestType)) {
+        switch (PipeRequestType.valueOf(rawRequestType)) {
+          case HANDSHAKE:
+            return 
handleTransferHandshake(PipeTransferHandshakeReq.fromTPipeTransferReq(req));
+          case TRANSFER_INSERT_NODE:
+            return handleTransferInsertNode(
+                PipeTransferInsertNodeReq.fromTPipeTransferReq(req),
+                partitionFetcher,
+                schemaFetcher);
+          case TRANSFER_TABLET:
+            return handleTransferTablet(
+                PipeTransferTabletReq.fromTPipeTransferReq(req), 
partitionFetcher, schemaFetcher);
+          case TRANSFER_BATCH:
+            return handleTransferBatch(
+                PipeTransferBatchReq.fromTPipeTransferReq(req), 
partitionFetcher, schemaFetcher);
+          case TRANSFER_FILE_PIECE:
+            return handleTransferFilePiece(
+                PipeTransferFilePieceReq.fromTPipeTransferReq(req),
+                req instanceof AirGapPseudoTPipeTransferRequest);
+          case TRANSFER_FILE_SEAL:
+            return handleTransferFileSeal(
+                PipeTransferFileSealReq.fromTPipeTransferReq(req), 
partitionFetcher, schemaFetcher);
+          default:
+            break;
+        }
       }
-    }
 
-    // unknown request type, which means the request can not be handled by 
this receiver,
-    // maybe the version of the receiver is not compatible with the sender
-    final TSStatus status =
-        RpcUtils.getStatus(
-            TSStatusCode.PIPE_TYPE_ERROR,
-            String.format("Unknown PipeRequestType %s.", rawRequestType));
-    LOGGER.warn("Unknown PipeRequestType, response status = {}.", status);
-    return new TPipeTransferResp(status);
+      // Unknown request type, which means the request can not be handled by 
this receiver,
+      // maybe the version of the receiver is not compatible with the sender
+      final TSStatus status =
+          RpcUtils.getStatus(
+              TSStatusCode.PIPE_TYPE_ERROR,
+              String.format("Unknown PipeRequestType %s.", rawRequestType));
+      LOGGER.warn("Unknown PipeRequestType, response status = {}.", status);
+      return new TPipeTransferResp(status);
+    } catch (IOException e) {
+      String error = String.format("Serialization error during pipe receiving, 
%s", e);
+      LOGGER.warn(error);
+      return new TPipeTransferResp(RpcUtils.getStatus(TSStatusCode.PIPE_ERROR, 
error));
+    }
   }
 
   private TPipeTransferResp handleTransferHandshake(PipeTransferHandshakeReq 
req) {
@@ -187,6 +204,24 @@ public class IoTDBThriftReceiverV1 implements 
IoTDBThriftReceiver {
             : executeStatement(statement, partitionFetcher, schemaFetcher));
   }
 
+  private TPipeTransferResp handleTransferBatch(
+      PipeTransferBatchReq req, IPartitionFetcher partitionFetcher, 
ISchemaFetcher schemaFetcher) {
+    final Pair<InsertRowsStatement, InsertMultiTabletsStatement> statementPair 
=
+        req.constructStatements();
+    return new TPipeTransferResp(
+        RpcUtils.squashResponseStatusList(
+            Stream.of(
+                    statementPair.getLeft().isEmpty()
+                        ? RpcUtils.SUCCESS_STATUS
+                        : executeStatement(
+                            statementPair.getLeft(), partitionFetcher, 
schemaFetcher),
+                    statementPair.getRight().isEmpty()
+                        ? RpcUtils.SUCCESS_STATUS
+                        : executeStatement(
+                            statementPair.getRight(), partitionFetcher, 
schemaFetcher))
+                .collect(Collectors.toList())));
+  }
+
   private TPipeTransferResp handleTransferFilePiece(
       PipeTransferFilePieceReq req, boolean isRequestThroughAirGap) {
     try {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsNode.java
index 20394f14ae0..23428d75bec 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsNode.java
@@ -37,6 +37,7 @@ import java.io.DataOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -112,11 +113,13 @@ public class InsertRowsNode extends InsertNode {
 
   @Override
   public List<PlanNode> getChildren() {
-    return null;
+    return Collections.emptyList();
   }
 
   @Override
-  public void addChild(PlanNode child) {}
+  public void addChild(PlanNode child) {
+    // Do nothing
+  }
 
   @Override
   public boolean equals(Object o) {
@@ -145,7 +148,7 @@ public class InsertRowsNode extends InsertNode {
 
   @Override
   public List<String> getOutputColumnNames() {
-    return null;
+    return Collections.emptyList();
   }
 
   public static InsertRowsNode deserialize(ByteBuffer byteBuffer) {

Reply via email to