This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 1b981972bd7 Pipe: Support InsertRows in 
PipeInsertNodeTabletInsertionEvent (#12286)
1b981972bd7 is described below

commit 1b981972bd7b95259e75a5dcb9fb74c740d51fc4
Author: Caideyipi <[email protected]>
AuthorDate: Fri Apr 26 14:31:44 2024 +0800

    Pipe: Support InsertRows in PipeInsertNodeTabletInsertionEvent (#12286)
    
    Co-authored-by: Steve Yurong Su <[email protected]>
---
 .../response/pipe/task/PipeTableResp.java          |   8 +-
 .../iotdb/confignode/manager/ProcedureManager.java |   2 +-
 .../receiver/IoTDBConfigNodeReceiverAgent.java     |   6 +-
 .../impl/pipe/task/AlterPipeProcedureV2.java       |   2 +-
 .../request/PipeTransferTabletInsertNodeReq.java   |  46 ++------
 .../protocol/legacy/IoTDBLegacyPipeConnector.java  |  11 +-
 .../connector/protocol/opcua/OpcUaConnector.java   |   4 +-
 .../protocol/websocket/WebSocketConnector.java     |  17 ++-
 .../websocket/WebSocketConnectorServer.java        |   5 +-
 .../db/pipe/event/common/row/PipeRowCollector.java |   4 +-
 .../tablet/PipeInsertNodeTabletInsertionEvent.java | 121 +++++++++++++++------
 .../tablet/TabletInsertionDataContainer.java       |  15 ++-
 .../realtime/PipeRealtimeDataRegionExtractor.java  |   6 +-
 .../processor/aggregate/AggregateProcessor.java    |   2 +-
 .../twostage/plugin/TwoStageCountProcessor.java    |   3 +
 .../protocol/thrift/IoTDBDataNodeReceiver.java     |  13 ++-
 .../visitor/PipePlanToStatementVisitor.java        | 105 +++++++++++++-----
 .../pipe/task/connection/PipeEventCollector.java   |   3 +-
 .../planner/plan/node/write/InsertRowsNode.java    |   9 +-
 .../broker/SubscriptionPrefetchingQueue.java       |  28 ++---
 .../commons/pipe/task/meta/PipeStaticMeta.java     |  29 ++---
 21 files changed, 274 insertions(+), 165 deletions(-)

diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/pipe/task/PipeTableResp.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/pipe/task/PipeTableResp.java
index 31aabb6f031..9385f43f9a8 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/pipe/task/PipeTableResp.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/pipe/task/PipeTableResp.java
@@ -51,13 +51,13 @@ public class PipeTableResp implements DataSet {
     return allPipeMeta;
   }
 
-  public PipeTableResp filter(Boolean whereClause, String pipeName) {
+  public PipeTableResp filter(final Boolean whereClause, final String 
pipeName) {
     if (whereClause == null || !whereClause) {
       if (pipeName == null) {
         return this;
       } else {
         final List<PipeMeta> filteredPipeMeta = new ArrayList<>();
-        for (PipeMeta pipeMeta : allPipeMeta) {
+        for (final PipeMeta pipeMeta : allPipeMeta) {
           if (pipeMeta.getStaticMeta().getPipeName().equals(pipeName)) {
             filteredPipeMeta.add(pipeMeta);
             break;
@@ -70,7 +70,7 @@ public class PipeTableResp implements DataSet {
         return this;
       } else {
         String sortedConnectorParametersString = null;
-        for (PipeMeta pipeMeta : allPipeMeta) {
+        for (final PipeMeta pipeMeta : allPipeMeta) {
           if (pipeMeta.getStaticMeta().getPipeName().equals(pipeName)) {
             sortedConnectorParametersString =
                 pipeMeta.getStaticMeta().getConnectorParameters().toString();
@@ -79,7 +79,7 @@ public class PipeTableResp implements DataSet {
         }
 
         final List<PipeMeta> filteredPipeMeta = new ArrayList<>();
-        for (PipeMeta pipeMeta : allPipeMeta) {
+        for (final PipeMeta pipeMeta : allPipeMeta) {
           if (pipeMeta
               .getStaticMeta()
               .getConnectorParameters()
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
index ed4bf94df68..58cdac06271 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
@@ -1153,7 +1153,7 @@ public class ProcedureManager {
       final Procedure<ConfigNodeProcedureEnv> finishedProcedure =
           executor.getResultOrProcedure(procedureId);
       if (!finishedProcedure.isFinished()) {
-        // the procedure is still executing
+        // The procedure is still executing
         statusList.add(
             RpcUtils.getStatus(TSStatusCode.OVERLAP_WITH_EXISTING_TASK, 
PROCEDURE_TIMEOUT_MESSAGE));
         isSucceed = false;
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/receiver/IoTDBConfigNodeReceiverAgent.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/receiver/IoTDBConfigNodeReceiverAgent.java
index 3b7b5557dca..fce5367398f 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/receiver/IoTDBConfigNodeReceiverAgent.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/receiver/IoTDBConfigNodeReceiverAgent.java
@@ -41,17 +41,17 @@ public class IoTDBConfigNodeReceiverAgent extends 
IoTDBReceiverAgent {
   }
 
   @Override
-  protected IoTDBReceiver getReceiverWithSpecifiedClient(String key) {
+  protected IoTDBReceiver getReceiverWithSpecifiedClient(final String key) {
     return clientKey2ReceiverMap.get(key);
   }
 
   @Override
-  protected void setReceiverWithSpecifiedClient(String key, IoTDBReceiver 
receiver) {
+  protected void setReceiverWithSpecifiedClient(final String key, final 
IoTDBReceiver receiver) {
     clientKey2ReceiverMap.put(key, receiver);
   }
 
   @Override
-  protected void removeReceiverWithSpecifiedClient(String key) {
+  protected void removeReceiverWithSpecifiedClient(final String key) {
     clientKey2ReceiverMap.remove(key);
   }
 
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/AlterPipeProcedureV2.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/AlterPipeProcedureV2.java
index 3fa1d11226b..936ad32e01d 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/AlterPipeProcedureV2.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/AlterPipeProcedureV2.java
@@ -111,7 +111,7 @@ public class AlterPipeProcedureV2 extends 
AbstractOperatePipeProcedureV2 {
     final Map<Integer, PipeTaskMeta> currentConsensusGroupId2PipeTaskMeta =
         currentPipeRuntimeMeta.getConsensusGroupId2TaskMetaMap();
 
-    // deep copy reused attributes
+    // Deep copy reused attributes
     updatedPipeStaticMeta =
         new PipeStaticMeta(
             alterPipeRequest.getPipeName(),
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferTabletInsertNodeReq.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferTabletInsertNodeReq.java
index 88fe4d738db..7fad79e843e 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferTabletInsertNodeReq.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferTabletInsertNodeReq.java
@@ -21,13 +21,13 @@ package 
org.apache.iotdb.db.pipe.connector.payload.evolvable.request;
 
 import 
org.apache.iotdb.commons.pipe.connector.payload.thrift.request.IoTDBConnectorRequestVersion;
 import 
org.apache.iotdb.commons.pipe.connector.payload.thrift.request.PipeRequestType;
+import org.apache.iotdb.db.pipe.receiver.protocol.thrift.IoTDBDataNodeReceiver;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeType;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowNode;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertTabletNode;
 import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertBaseStatement;
-import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowStatement;
-import 
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement;
 import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
 
 import org.apache.tsfile.utils.BytesUtils;
@@ -51,41 +51,17 @@ public class PipeTransferTabletInsertNodeReq extends 
TPipeTransferReq {
   }
 
   public InsertBaseStatement constructStatement() {
-    if (insertNode instanceof InsertRowNode) {
-      final InsertRowNode node = (InsertRowNode) insertNode;
-
-      final InsertRowStatement statement = new InsertRowStatement();
-      statement.setDevicePath(node.getDevicePath());
-      statement.setTime(node.getTime());
-      statement.setMeasurements(node.getMeasurements());
-      statement.setDataTypes(node.getDataTypes());
-      statement.setValues(node.getValues());
-      statement.setNeedInferType(node.isNeedInferType());
-      statement.setAligned(node.isAligned());
-      statement.setMeasurementSchemas(node.getMeasurementSchemas());
-      return statement;
+    if (!(insertNode instanceof InsertRowNode
+        || insertNode instanceof InsertTabletNode
+        || insertNode instanceof InsertRowsNode)) {
+      throw new UnsupportedOperationException(
+          String.format(
+              "Unknown InsertNode type %s when constructing statement from 
insert node.",
+              insertNode));
     }
 
-    if (insertNode instanceof InsertTabletNode) {
-      final InsertTabletNode node = (InsertTabletNode) insertNode;
-
-      final InsertTabletStatement statement = new InsertTabletStatement();
-      statement.setDevicePath(node.getDevicePath());
-      statement.setMeasurements(node.getMeasurements());
-      statement.setTimes(node.getTimes());
-      statement.setColumns(node.getColumns());
-      statement.setBitMaps(node.getBitMaps());
-      statement.setRowCount(node.getRowCount());
-      statement.setDataTypes(node.getDataTypes());
-      statement.setAligned(node.isAligned());
-      statement.setMeasurementSchemas(node.getMeasurementSchemas());
-      return statement;
-    }
-
-    throw new UnsupportedOperationException(
-        String.format(
-            "unknown InsertNode type %s when constructing statement from 
insert node.",
-            insertNode));
+    return (InsertBaseStatement)
+        IoTDBDataNodeReceiver.PLAN_TO_STATEMENT_VISITOR.process(insertNode, 
null);
   }
 
   /////////////////////////////// WriteBack & Batch 
///////////////////////////////
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/legacy/IoTDBLegacyPipeConnector.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/legacy/IoTDBLegacyPipeConnector.java
index dd12caf55a1..355d354c385 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/legacy/IoTDBLegacyPipeConnector.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/legacy/IoTDBLegacyPipeConnector.java
@@ -325,11 +325,12 @@ public class IoTDBLegacyPipeConnector implements 
PipeConnector {
 
   private void doTransfer(final PipeInsertNodeTabletInsertionEvent 
pipeInsertNodeInsertionEvent)
       throws IoTDBConnectionException, StatementExecutionException {
-    final Tablet tablet = pipeInsertNodeInsertionEvent.convertToTablet();
-    if (pipeInsertNodeInsertionEvent.isAligned()) {
-      sessionPool.insertAlignedTablet(tablet);
-    } else {
-      sessionPool.insertTablet(tablet);
+    for (final Tablet tablet : 
pipeInsertNodeInsertionEvent.convertToTablets()) {
+      if (pipeInsertNodeInsertionEvent.isAligned()) {
+        sessionPool.insertAlignedTablet(tablet);
+      } else {
+        sessionPool.insertTablet(tablet);
+      }
     }
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/opcua/OpcUaConnector.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/opcua/OpcUaConnector.java
index 5efb8000236..fa1c372db28 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/opcua/OpcUaConnector.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/opcua/OpcUaConnector.java
@@ -186,7 +186,9 @@ public class OpcUaConnector implements PipeConnector {
           OpcUaConnector.class.getName())) {
         return;
       }
-      transferTablet(server, 
pipeInsertNodeTabletInsertionEvent.convertToTablet());
+      for (final Tablet tablet : 
pipeInsertNodeTabletInsertionEvent.convertToTablets()) {
+        transferTablet(server, tablet);
+      }
     } finally {
       pipeInsertNodeTabletInsertionEvent.decreaseReferenceCount(
           OpcUaConnector.class.getName(), false);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/websocket/WebSocketConnector.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/websocket/WebSocketConnector.java
index 049ab3d2e05..e1e5a5dd44c 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/websocket/WebSocketConnector.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/websocket/WebSocketConnector.java
@@ -16,6 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.iotdb.db.pipe.connector.protocol.websocket;
 
 import org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant;
@@ -93,7 +94,7 @@ public class WebSocketConnector implements PipeConnector {
   }
 
   @Override
-  public void transfer(TabletInsertionEvent tabletInsertionEvent) {
+  public void transfer(final TabletInsertionEvent tabletInsertionEvent) {
     if (!(tabletInsertionEvent instanceof PipeInsertNodeTabletInsertionEvent)
         && !(tabletInsertionEvent instanceof PipeRawTabletInsertionEvent)) {
       LOGGER.warn(
@@ -103,6 +104,20 @@ public class WebSocketConnector implements PipeConnector {
       return;
     }
 
+    if (tabletInsertionEvent instanceof PipeInsertNodeTabletInsertionEvent) {
+      final PipeInsertNodeTabletInsertionEvent event =
+          (PipeInsertNodeTabletInsertionEvent) tabletInsertionEvent;
+      for (final PipeRawTabletInsertionEvent rawTabletInsertionEvent :
+          event.toRawTabletInsertionEvents()) {
+        // Skip report if any tablet events is added
+        event.skipReportOnCommit();
+        // Transfer raw tablet insertion event to make sure one event binds
+        // to only one tablet in the server
+        transfer(rawTabletInsertionEvent);
+      }
+      return;
+    }
+
     ((EnrichedEvent) tabletInsertionEvent)
         .increaseReferenceCount(WebSocketConnector.class.getName());
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/websocket/WebSocketConnectorServer.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/websocket/WebSocketConnectorServer.java
index 441bc0450df..2db12e0a10e 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/websocket/WebSocketConnectorServer.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/websocket/WebSocketConnectorServer.java
@@ -20,7 +20,6 @@
 package org.apache.iotdb.db.pipe.connector.protocol.websocket;
 
 import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
-import 
org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent;
 import 
org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
 import org.apache.iotdb.pipe.api.event.Event;
 import org.apache.iotdb.pipe.api.exception.PipeException;
@@ -368,9 +367,7 @@ public class WebSocketConnectorServer extends 
WebSocketServer {
 
       try {
         ByteBuffer tabletBuffer;
-        if (event instanceof PipeInsertNodeTabletInsertionEvent) {
-          tabletBuffer = ((PipeInsertNodeTabletInsertionEvent) 
event).convertToTablet().serialize();
-        } else if (event instanceof PipeRawTabletInsertionEvent) {
+        if (event instanceof PipeRawTabletInsertionEvent) {
           tabletBuffer = ((PipeRawTabletInsertionEvent) 
event).convertToTablet().serialize();
         } else {
           throw new NotImplementedException(
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/row/PipeRowCollector.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/row/PipeRowCollector.java
index bf2d1659fe4..1b747652e1c 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/row/PipeRowCollector.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/row/PipeRowCollector.java
@@ -112,11 +112,11 @@ public class PipeRowCollector implements RowCollector {
     this.tablet = null;
   }
 
-  public Iterable<TabletInsertionEvent> convertToTabletInsertionEvents() {
+  public List<TabletInsertionEvent> convertToTabletInsertionEvents(final 
boolean shouldReport) {
     collectTabletInsertionEvent();
 
     final int eventListSize = tabletInsertionEventList.size();
-    if (eventListSize > 0) { // The last event should report progress
+    if (eventListSize > 0 && shouldReport) { // The last event should report 
progress
       ((PipeRawTabletInsertionEvent) 
tabletInsertionEventList.get(eventListSize - 1))
           .markAsNeedToReport();
     }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
index 798cb1da706..dad53f2b69d 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
@@ -27,6 +27,7 @@ import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
 import org.apache.iotdb.db.pipe.resource.PipeResourceManager;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowNode;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertTabletNode;
 import 
org.apache.iotdb.db.storageengine.dataregion.wal.exception.WALPipeException;
 import org.apache.iotdb.db.storageengine.dataregion.wal.utils.WALEntryHandler;
@@ -41,8 +42,12 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
 import java.util.Objects;
 import java.util.function.BiConsumer;
+import java.util.stream.Collectors;
 
 public class PipeInsertNodeTabletInsertionEvent extends EnrichedEvent
     implements TabletInsertionEvent {
@@ -54,7 +59,7 @@ public class PipeInsertNodeTabletInsertionEvent extends 
EnrichedEvent
   private final boolean isAligned;
   private final boolean isGeneratedByPipe;
 
-  private TabletInsertionDataContainer dataContainer;
+  private List<TabletInsertionDataContainer> dataContainers;
 
   private ProgressIndex progressIndex;
 
@@ -129,8 +134,11 @@ public class PipeInsertNodeTabletInsertionEvent extends 
EnrichedEvent
   public boolean internallyDecreaseResourceReferenceCount(String 
holderMessage) {
     try {
       PipeResourceManager.wal().unpin(walEntryHandler);
-      // Release the container's memory.
-      dataContainer = null;
+      // Release the containers' memory.
+      if (dataContainers != null) {
+        dataContainers.clear();
+        dataContainers = null;
+      }
       return true;
     } catch (Exception e) {
       LOGGER.warn(
@@ -190,6 +198,14 @@ public class PipeInsertNodeTabletInsertionEvent extends 
EnrichedEvent
         }
         // We assume that `timestamps` is ordered.
         return startTime <= timestamps[timestamps.length - 1] && timestamps[0] 
<= endTime;
+      } else if (insertNode instanceof InsertRowsNode) {
+        for (final InsertRowNode node : ((InsertRowsNode) 
insertNode).getInsertRowNodeList()) {
+          final long timestamp = node.getTime();
+          if (startTime <= timestamp && timestamp <= endTime) {
+            return true;
+          }
+        }
+        return false;
       } else {
         throw new UnSupportedDataTypeException(
             String.format("InsertNode type %s is not supported.", 
insertNode.getClass().getName()));
@@ -209,28 +225,18 @@ public class PipeInsertNodeTabletInsertionEvent extends 
EnrichedEvent
 
   @Override
   public Iterable<TabletInsertionEvent> processRowByRow(BiConsumer<Row, 
RowCollector> consumer) {
-    try {
-      if (dataContainer == null) {
-        dataContainer =
-            new TabletInsertionDataContainer(pipeTaskMeta, this, 
getInsertNode(), pipePattern);
-      }
-      return dataContainer.processRowByRow(consumer);
-    } catch (Exception e) {
-      throw new PipeException("Process row by row error.", e);
-    }
+    return initDataContainers().stream()
+        .map(tabletInsertionDataContainer -> 
tabletInsertionDataContainer.processRowByRow(consumer))
+        .flatMap(Collection::stream)
+        .collect(Collectors.toList());
   }
 
   @Override
   public Iterable<TabletInsertionEvent> processTablet(BiConsumer<Tablet, 
RowCollector> consumer) {
-    try {
-      if (dataContainer == null) {
-        dataContainer =
-            new TabletInsertionDataContainer(pipeTaskMeta, this, 
getInsertNode(), pipePattern);
-      }
-      return dataContainer.processTablet(consumer);
-    } catch (Exception e) {
-      throw new PipeException("Process tablet error.", e);
-    }
+    return initDataContainers().stream()
+        .map(tabletInsertionDataContainer -> 
tabletInsertionDataContainer.processTablet(consumer))
+        .flatMap(Collection::stream)
+        .collect(Collectors.toList());
   }
 
   /////////////////////////// convertToTablet ///////////////////////////
@@ -239,21 +245,55 @@ public class PipeInsertNodeTabletInsertionEvent extends 
EnrichedEvent
     return isAligned;
   }
 
-  public Tablet convertToTablet() {
+  public List<Tablet> convertToTablets() {
+    return initDataContainers().stream()
+        .map(TabletInsertionDataContainer::convertToTablet)
+        .collect(Collectors.toList());
+  }
+
+  /////////////////////////// dataContainer ///////////////////////////
+
+  private List<TabletInsertionDataContainer> initDataContainers() {
     try {
-      if (dataContainer == null) {
-        dataContainer =
-            new TabletInsertionDataContainer(pipeTaskMeta, this, 
getInsertNode(), pipePattern);
+      if (dataContainers != null) {
+        return dataContainers;
+      }
+
+      dataContainers = new ArrayList<>();
+      final InsertNode node = getInsertNode();
+      switch (node.getType()) {
+        case INSERT_ROW:
+        case INSERT_TABLET:
+          dataContainers.add(
+              new TabletInsertionDataContainer(pipeTaskMeta, this, node, 
pipePattern));
+          break;
+        case INSERT_ROWS:
+          for (final InsertRowNode insertRowNode : ((InsertRowsNode) 
node).getInsertRowNodeList()) {
+            dataContainers.add(
+                new TabletInsertionDataContainer(pipeTaskMeta, this, 
insertRowNode, pipePattern));
+          }
+          break;
+        default:
+          throw new UnSupportedDataTypeException("Unsupported node type " + 
node.getType());
+      }
+
+      final int size = dataContainers.size();
+      if (size > 0) {
+        dataContainers.get(size - 1).markAsNeedToReport();
       }
-      return dataContainer.convertToTablet();
+
+      return dataContainers;
     } catch (Exception e) {
-      throw new PipeException("Convert to tablet error.", e);
+      throw new PipeException("Initialize data container error.", e);
     }
   }
 
   public long count() {
-    final Tablet covertedTablet = convertToTablet();
-    return (long) covertedTablet.rowSize * covertedTablet.getSchemas().size();
+    long count = 0;
+    for (final Tablet covertedTablet : convertToTablets()) {
+      count += (long) covertedTablet.rowSize * 
covertedTablet.getSchemas().size();
+    }
+    return count;
   }
 
   /////////////////////////// parsePatternOrTime ///////////////////////////
@@ -266,9 +306,22 @@ public class PipeInsertNodeTabletInsertionEvent extends 
EnrichedEvent
         && (Objects.isNull(node) || 
!pipePattern.coversDevice(node.getDevicePath().getFullPath()));
   }
 
-  public PipeRawTabletInsertionEvent parseEventWithPatternOrTime() {
-    return new PipeRawTabletInsertionEvent(
-        convertToTablet(), isAligned, pipeName, pipeTaskMeta, this, true);
+  public List<PipeRawTabletInsertionEvent> toRawTabletInsertionEvents() {
+    final List<PipeRawTabletInsertionEvent> events =
+        convertToTablets().stream()
+            .map(
+                tablet ->
+                    new PipeRawTabletInsertionEvent(
+                        tablet, isAligned, pipeName, pipeTaskMeta, this, 
false))
+            .filter(event -> !event.hasNoNeedParsingAndIsEmpty())
+            .collect(Collectors.toList());
+
+    final int size = events.size();
+    if (size > 0) {
+      events.get(size - 1).markAsNeedToReport();
+    }
+
+    return events;
   }
 
   /////////////////////////// Object ///////////////////////////
@@ -276,8 +329,8 @@ public class PipeInsertNodeTabletInsertionEvent extends 
EnrichedEvent
   @Override
   public String toString() {
     return String.format(
-            "PipeInsertNodeTabletInsertionEvent{walEntryHandler=%s, 
progressIndex=%s, isAligned=%s, isGeneratedByPipe=%s, dataContainer=%s}",
-            walEntryHandler, progressIndex, isAligned, isGeneratedByPipe, 
dataContainer)
+            "PipeInsertNodeTabletInsertionEvent{walEntryHandler=%s, 
progressIndex=%s, isAligned=%s, isGeneratedByPipe=%s, dataContainers=%s}",
+            walEntryHandler, progressIndex, isAligned, isGeneratedByPipe, 
dataContainers)
         + " - "
         + super.toString();
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/TabletInsertionDataContainer.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/TabletInsertionDataContainer.java
index 750a77d209e..e95bc5373b1 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/TabletInsertionDataContainer.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/TabletInsertionDataContainer.java
@@ -75,6 +75,9 @@ public class TabletInsertionDataContainer {
 
   private Tablet tablet;
 
+  // Whether the container shall report progress
+  private boolean shouldReport = false;
+
   private static final Integer CACHED_FULL_ROW_INDEX_LIST_ROW_COUNT_UPPER = 16;
   private static final Map<Integer, List<Integer>> cachedFullRowIndexList = 
new HashMap<>();
 
@@ -124,6 +127,10 @@ public class TabletInsertionDataContainer {
     return isAligned;
   }
 
+  public void markAsNeedToReport() {
+    shouldReport = true;
+  }
+
   //////////////////////////// parse ////////////////////////////
 
   private void parse(InsertRowNode insertRowNode, PipePattern pattern) {
@@ -551,7 +558,7 @@ public class TabletInsertionDataContainer {
 
   ////////////////////////////  process  ////////////////////////////
 
-  public Iterable<TabletInsertionEvent> processRowByRow(BiConsumer<Row, 
RowCollector> consumer) {
+  public List<TabletInsertionEvent> processRowByRow(BiConsumer<Row, 
RowCollector> consumer) {
     if (valueColumns.length == 0 || timestampColumn.length == 0) {
       return Collections.emptyList();
     }
@@ -571,13 +578,13 @@ public class TabletInsertionDataContainer {
               columnNameStringList),
           rowCollector);
     }
-    return rowCollector.convertToTabletInsertionEvents();
+    return rowCollector.convertToTabletInsertionEvents(shouldReport);
   }
 
-  public Iterable<TabletInsertionEvent> processTablet(BiConsumer<Tablet, 
RowCollector> consumer) {
+  public List<TabletInsertionEvent> processTablet(BiConsumer<Tablet, 
RowCollector> consumer) {
     final PipeRowCollector rowCollector = new PipeRowCollector(pipeTaskMeta, 
sourceEvent);
     consumer.accept(convertToTablet(), rowCollector);
-    return rowCollector.convertToTabletInsertionEvents();
+    return rowCollector.convertToTabletInsertionEvents(shouldReport);
   }
 
   ////////////////////////////  convertToTablet  ////////////////////////////
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionExtractor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionExtractor.java
index c939a9002c7..2c37ca06fb2 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionExtractor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionExtractor.java
@@ -290,7 +290,7 @@ public abstract class PipeRealtimeDataRegionExtractor 
implements PipeExtractor {
     }
 
     if (!pendingQueue.waitedOffer(event)) {
-      // this would not happen, but just in case.
+      // This would not happen, but just in case.
       // pendingQueue is unbounded, so it should never reach capacity.
       LOGGER.error(
           "extract: pending queue of PipeRealtimeDataRegionHybridExtractor {} "
@@ -301,7 +301,7 @@ public abstract class PipeRealtimeDataRegionExtractor 
implements PipeExtractor {
       // Do not report exception since the PipeHeartbeatEvent doesn't affect
       // the correction of pipe progress.
 
-      // ignore this event.
+      // Ignore this event.
       
event.decreaseReferenceCount(PipeRealtimeDataRegionExtractor.class.getName(), 
false);
     }
   }
@@ -344,7 +344,7 @@ public abstract class PipeRealtimeDataRegionExtractor 
implements PipeExtractor {
     if 
(event.increaseReferenceCount(PipeRealtimeDataRegionExtractor.class.getName())) 
{
       return event.getEvent();
     } else {
-      // if the event's reference count can not be increased, it means the 
data represented by
+      // If the event's reference count can not be increased, it means the 
data represented by
       // this event is not reliable anymore. the data has been lost. we simply 
discard this
       // event and report the exception to PipeRuntimeAgent.
       final String errorMessage =
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/aggregate/AggregateProcessor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/aggregate/AggregateProcessor.java
index 4d19195e0f4..dde1fab5a28 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/aggregate/AggregateProcessor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/aggregate/AggregateProcessor.java
@@ -534,7 +534,7 @@ public class AggregateProcessor implements PipeProcessor {
                     exception.set(e);
                   }
                   rowCollector
-                      .convertToTabletInsertionEvents()
+                      .convertToTabletInsertionEvents(false)
                       .forEach(
                           tabletEvent -> {
                             try {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/plugin/TwoStageCountProcessor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/plugin/TwoStageCountProcessor.java
index cbc14d976dc..2017051866f 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/plugin/TwoStageCountProcessor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/plugin/TwoStageCountProcessor.java
@@ -211,6 +211,8 @@ public class TwoStageCountProcessor implements 
PipeProcessor {
       collectGlobalCountIfNecessary(eventCollector);
       commitLocalProgressIndexIfNecessary();
       triggerCombineIfNecessary();
+      eventCollector.collect(event);
+      return;
     }
 
     if (event instanceof PipeWatermarkEvent) {
@@ -218,6 +220,7 @@ public class TwoStageCountProcessor implements 
PipeProcessor {
           new Pair<>(
               new long[] {((PipeWatermarkEvent) event).getWatermark(), 
localCount.get()},
               localCommitProgressIndex.get()));
+      // TODO: Collect watermark events. We ignore it because they may cause 
OOM in collector.
     }
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
index d8b4c12c733..c9626c3ba81 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
@@ -105,8 +105,11 @@ public class IoTDBDataNodeReceiver extends 
IoTDBFileReceiver {
   private static final String[] RECEIVER_FILE_BASE_DIRS = 
IOTDB_CONFIG.getPipeReceiverFileDirs();
   private static FolderManager folderManager = null;
 
-  private final PipeStatementTSStatusVisitor statusVisitor = new 
PipeStatementTSStatusVisitor();
-  private final PipeStatementExceptionVisitor exceptionVisitor =
+  public static final PipePlanToStatementVisitor PLAN_TO_STATEMENT_VISITOR =
+      new PipePlanToStatementVisitor();
+  private static final PipeStatementTSStatusVisitor STATEMENT_STATUS_VISITOR =
+      new PipeStatementTSStatusVisitor();
+  private static final PipeStatementExceptionVisitor 
STATEMENT_EXCEPTION_VISITOR =
       new PipeStatementExceptionVisitor();
   private final PipeStatementToBatchVisitor batchVisitor = new 
PipeStatementToBatchVisitor();
 
@@ -329,7 +332,7 @@ public class IoTDBDataNodeReceiver extends 
IoTDBFileReceiver {
                 .alterLogicalViewByPipe((AlterLogicalViewNode) 
req.getPlanNode()))
         : new TPipeTransferResp(
             executeStatementAndClassifyExceptions(
-                new PipePlanToStatementVisitor().process(req.getPlanNode(), 
null)));
+                PLAN_TO_STATEMENT_VISITOR.process(req.getPlanNode(), null)));
   }
 
   private TPipeTransferResp handleTransferConfigPlan(final TPipeTransferReq 
req) {
@@ -361,7 +364,7 @@ public class IoTDBDataNodeReceiver extends 
IoTDBFileReceiver {
             receiverId.get(),
             statement,
             result);
-        return statement.accept(statusVisitor, result);
+        return statement.accept(STATEMENT_STATUS_VISITOR, result);
       }
     } catch (final Exception e) {
       LOGGER.warn(
@@ -369,7 +372,7 @@ public class IoTDBDataNodeReceiver extends 
IoTDBFileReceiver {
           receiverId.get(),
           statement,
           e);
-      return statement.accept(exceptionVisitor, e);
+      return statement.accept(STATEMENT_EXCEPTION_VISITOR, e);
     }
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipePlanToStatementVisitor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipePlanToStatementVisitor.java
index 44b3d67d0c0..32e3e6947ed 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipePlanToStatementVisitor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipePlanToStatementVisitor.java
@@ -34,8 +34,14 @@ import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.write.Int
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.write.MeasurementGroup;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.write.view.CreateLogicalViewNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.DeleteDataNode;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowNode;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsNode;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertTabletNode;
 import org.apache.iotdb.db.queryengine.plan.statement.Statement;
 import org.apache.iotdb.db.queryengine.plan.statement.crud.DeleteDataStatement;
+import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowStatement;
+import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowsStatement;
+import 
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement;
 import 
org.apache.iotdb.db.queryengine.plan.statement.internal.InternalCreateMultiTimeSeriesStatement;
 import 
org.apache.iotdb.db.queryengine.plan.statement.internal.InternalCreateTimeSeriesStatement;
 import 
org.apache.iotdb.db.queryengine.plan.statement.metadata.AlterTimeSeriesStatement;
@@ -54,11 +60,12 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.stream.Collectors;
 
 public class PipePlanToStatementVisitor extends PlanVisitor<Statement, Void> {
 
   @Override
-  public Statement visitPlan(PlanNode node, Void context) {
+  public Statement visitPlan(final PlanNode node, final Void context) {
     throw new UnsupportedOperationException(
         String.format(
             "PipePlanToStatementVisitor does not support visiting general 
plan, PlanNode: %s",
@@ -66,8 +73,48 @@ public class PipePlanToStatementVisitor extends 
PlanVisitor<Statement, Void> {
   }
 
   @Override
-  public CreateTimeSeriesStatement visitCreateTimeSeries(CreateTimeSeriesNode 
node, Void context) {
-    CreateTimeSeriesStatement statement = new CreateTimeSeriesStatement();
+  public InsertRowStatement visitInsertRow(final InsertRowNode node, final 
Void context) {
+    final InsertRowStatement statement = new InsertRowStatement();
+    statement.setDevicePath(node.getDevicePath());
+    statement.setTime(node.getTime());
+    statement.setMeasurements(node.getMeasurements());
+    statement.setDataTypes(node.getDataTypes());
+    statement.setValues(node.getValues());
+    statement.setNeedInferType(node.isNeedInferType());
+    statement.setAligned(node.isAligned());
+    statement.setMeasurementSchemas(node.getMeasurementSchemas());
+    return statement;
+  }
+
+  @Override
+  public InsertTabletStatement visitInsertTablet(final InsertTabletNode node, 
final Void context) {
+    final InsertTabletStatement statement = new InsertTabletStatement();
+    statement.setDevicePath(node.getDevicePath());
+    statement.setMeasurements(node.getMeasurements());
+    statement.setTimes(node.getTimes());
+    statement.setColumns(node.getColumns());
+    statement.setBitMaps(node.getBitMaps());
+    statement.setRowCount(node.getRowCount());
+    statement.setDataTypes(node.getDataTypes());
+    statement.setAligned(node.isAligned());
+    statement.setMeasurementSchemas(node.getMeasurementSchemas());
+    return statement;
+  }
+
+  @Override
+  public InsertRowsStatement visitInsertRows(final InsertRowsNode node, final 
Void context) {
+    final InsertRowsStatement statement = new InsertRowsStatement();
+    statement.setInsertRowStatementList(
+        node.getInsertRowNodeList().stream()
+            .map(insertRowNode -> visitInsertRow(insertRowNode, context))
+            .collect(Collectors.toList()));
+    return statement;
+  }
+
+  @Override
+  public CreateTimeSeriesStatement visitCreateTimeSeries(
+      final CreateTimeSeriesNode node, final Void context) {
+    final CreateTimeSeriesStatement statement = new 
CreateTimeSeriesStatement();
     statement.setPath(node.getPath());
     statement.setDataType(node.getDataType());
     statement.setEncoding(node.getEncoding());
@@ -81,8 +128,8 @@ public class PipePlanToStatementVisitor extends 
PlanVisitor<Statement, Void> {
 
   @Override
   public CreateAlignedTimeSeriesStatement visitCreateAlignedTimeSeries(
-      CreateAlignedTimeSeriesNode node, Void context) {
-    CreateAlignedTimeSeriesStatement statement = new 
CreateAlignedTimeSeriesStatement();
+      final CreateAlignedTimeSeriesNode node, final Void context) {
+    final CreateAlignedTimeSeriesStatement statement = new 
CreateAlignedTimeSeriesStatement();
     statement.setDataTypes(node.getDataTypes());
     statement.setCompressors(node.getCompressors());
     statement.setEncodings(node.getEncodings());
@@ -96,17 +143,17 @@ public class PipePlanToStatementVisitor extends 
PlanVisitor<Statement, Void> {
 
   @Override
   public CreateMultiTimeSeriesStatement visitCreateMultiTimeSeries(
-      CreateMultiTimeSeriesNode node, Void context) {
-    CreateMultiTimeSeriesStatement statement = new 
CreateMultiTimeSeriesStatement();
-
-    List<PartialPath> paths = new ArrayList<>();
-    List<TSDataType> dataTypes = new ArrayList<>();
-    List<TSEncoding> encodings = new ArrayList<>();
-    List<CompressionType> compressors = new ArrayList<>();
-    List<Map<String, String>> propsList = new ArrayList<>();
-    List<String> aliasList = new ArrayList<>();
-    List<Map<String, String>> tagsList = new ArrayList<>();
-    List<Map<String, String>> attributesList = new ArrayList<>();
+      final CreateMultiTimeSeriesNode node, final Void context) {
+    final CreateMultiTimeSeriesStatement statement = new 
CreateMultiTimeSeriesStatement();
+
+    final List<PartialPath> paths = new ArrayList<>();
+    final List<TSDataType> dataTypes = new ArrayList<>();
+    final List<TSEncoding> encodings = new ArrayList<>();
+    final List<CompressionType> compressors = new ArrayList<>();
+    final List<Map<String, String>> propsList = new ArrayList<>();
+    final List<String> aliasList = new ArrayList<>();
+    final List<Map<String, String>> tagsList = new ArrayList<>();
+    final List<Map<String, String>> attributesList = new ArrayList<>();
 
     for (Map.Entry<PartialPath, MeasurementGroup> path2Group :
         node.getMeasurementGroupMap().entrySet()) {
@@ -146,8 +193,9 @@ public class PipePlanToStatementVisitor extends 
PlanVisitor<Statement, Void> {
   }
 
   @Override
-  public AlterTimeSeriesStatement visitAlterTimeSeries(AlterTimeSeriesNode 
node, Void context) {
-    AlterTimeSeriesStatement statement = new AlterTimeSeriesStatement();
+  public AlterTimeSeriesStatement visitAlterTimeSeries(
+      final AlterTimeSeriesNode node, final Void context) {
+    final AlterTimeSeriesStatement statement = new AlterTimeSeriesStatement();
     statement.setAlterMap(node.getAlterMap());
     statement.setAlterType(node.getAlterType());
     statement.setAttributesMap(node.getAttributesMap());
@@ -159,7 +207,7 @@ public class PipePlanToStatementVisitor extends 
PlanVisitor<Statement, Void> {
 
   @Override
   public InternalCreateTimeSeriesStatement visitInternalCreateTimeSeries(
-      InternalCreateTimeSeriesNode node, Void context) {
+      final InternalCreateTimeSeriesNode node, final Void context) {
     return new InternalCreateTimeSeriesStatement(
         node.getDevicePath(),
         node.getMeasurementGroup().getMeasurements(),
@@ -170,36 +218,37 @@ public class PipePlanToStatementVisitor extends 
PlanVisitor<Statement, Void> {
   }
 
   @Override
-  public ActivateTemplateStatement visitActivateTemplate(ActivateTemplateNode 
node, Void context) {
-    ActivateTemplateStatement statement = new ActivateTemplateStatement();
+  public ActivateTemplateStatement visitActivateTemplate(
+      final ActivateTemplateNode node, final Void context) {
+    final ActivateTemplateStatement statement = new 
ActivateTemplateStatement();
     statement.setPath(node.getActivatePath());
     return statement;
   }
 
   @Override
   public BatchActivateTemplateStatement visitInternalBatchActivateTemplate(
-      InternalBatchActivateTemplateNode node, Void context) {
+      final InternalBatchActivateTemplateNode node, final Void context) {
     return new BatchActivateTemplateStatement(
         new ArrayList<>(node.getTemplateActivationMap().keySet()));
   }
 
   @Override
   public InternalCreateMultiTimeSeriesStatement 
visitInternalCreateMultiTimeSeries(
-      InternalCreateMultiTimeSeriesNode node, Void context) {
+      final InternalCreateMultiTimeSeriesNode node, final Void context) {
     return new InternalCreateMultiTimeSeriesStatement(node.getDeviceMap());
   }
 
   @Override
   public BatchActivateTemplateStatement visitBatchActivateTemplate(
-      BatchActivateTemplateNode node, Void context) {
+      final BatchActivateTemplateNode node, final Void context) {
     return new BatchActivateTemplateStatement(
         new ArrayList<>(node.getTemplateActivationMap().keySet()));
   }
 
   @Override
   public CreateLogicalViewStatement visitCreateLogicalView(
-      CreateLogicalViewNode node, Void context) {
-    CreateLogicalViewStatement statement = new CreateLogicalViewStatement();
+      final CreateLogicalViewNode node, final Void context) {
+    final CreateLogicalViewStatement statement = new 
CreateLogicalViewStatement();
     statement.setTargetFullPaths(node.getViewPathList());
     statement.setViewExpressions(new 
ArrayList<>(node.getViewPathToSourceExpressionMap().values()));
     return statement;
@@ -208,8 +257,8 @@ public class PipePlanToStatementVisitor extends 
PlanVisitor<Statement, Void> {
   // We do not support AlterLogicalViewNode parsing and use direct rpc instead
 
   @Override
-  public DeleteDataStatement visitDeleteData(DeleteDataNode node, Void 
context) {
-    DeleteDataStatement statement = new DeleteDataStatement();
+  public DeleteDataStatement visitDeleteData(DeleteDataNode node, final Void 
context) {
+    final DeleteDataStatement statement = new DeleteDataStatement();
     statement.setDeleteEndTime(node.getDeleteEndTime());
     statement.setDeleteStartTime(node.getDeleteStartTime());
     statement.setPathList(node.getPathList());
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/connection/PipeEventCollector.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/connection/PipeEventCollector.java
index 730cc5bb661..c7e34b53aa6 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/connection/PipeEventCollector.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/connection/PipeEventCollector.java
@@ -83,8 +83,7 @@ public class PipeEventCollector implements EventCollector, 
AutoCloseable {
 
   private void parseAndCollectEvent(PipeInsertNodeTabletInsertionEvent 
sourceEvent) {
     if (sourceEvent.shouldParseTimeOrPattern()) {
-      final PipeRawTabletInsertionEvent parsedEvent = 
sourceEvent.parseEventWithPatternOrTime();
-      if (!parsedEvent.hasNoNeedParsingAndIsEmpty()) {
+      for (PipeRawTabletInsertionEvent parsedEvent : 
sourceEvent.toRawTabletInsertionEvents()) {
         collectEvent(parsedEvent);
       }
     } else {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsNode.java
index 7a9cbf8190b..a828d36f951 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsNode.java
@@ -16,6 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.iotdb.db.queryengine.plan.planner.plan.node.write;
 
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
@@ -58,7 +59,7 @@ public class InsertRowsNode extends InsertNode {
    */
   private List<Integer> insertRowNodeIndexList;
 
-  /** the InsertRowsNode list */
+  /** The {@link InsertRowNode} list */
   private List<InsertRowNode> insertRowNodeList;
 
   public InsertRowsNode(PlanNodeId id) {
@@ -74,7 +75,7 @@ public class InsertRowsNode extends InsertNode {
     this.insertRowNodeList = insertRowNodeList;
   }
 
-  /** record the result of insert rows */
+  /** Record the result of insert rows */
   private Map<Integer, TSStatus> results = new HashMap<>();
 
   public List<Integer> getInsertRowNodeIndexList() {
@@ -227,14 +228,14 @@ public class InsertRowsNode extends InsertNode {
     List<TEndPoint> redirectInfo = new ArrayList<>();
     for (int i = 0; i < insertRowNodeList.size(); i++) {
       InsertRowNode insertRowNode = insertRowNodeList.get(i);
-      // data region for insert row node
+      // Data region for insert row node
       TRegionReplicaSet dataRegionReplicaSet =
           analysis
               .getDataPartitionInfo()
               .getDataRegionReplicaSetForWriting(
                   insertRowNode.devicePath.getFullPath(),
                   
TimePartitionUtils.getTimePartitionSlot(insertRowNode.getTime()));
-      // collect redirectInfo
+      // Collect redirectInfo
       
redirectInfo.add(dataRegionReplicaSet.getDataNodeLocations().get(0).getClientRpcEndPoint());
       InsertRowsNode tmpNode = splitMap.get(dataRegionReplicaSet);
       if (tmpNode != null) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue.java
index e4bb921042b..9a75e464475 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue.java
@@ -38,6 +38,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
@@ -144,23 +145,23 @@ public class SubscriptionPrefetchingQueue {
       }
 
       if (event instanceof TabletInsertionEvent) {
-        Tablet tablet = convertToTablet((TabletInsertionEvent) event);
-        if (Objects.isNull(tablet)) {
+        final List<Tablet> tabletList = 
convertToTablets((TabletInsertionEvent) event);
+        if (Objects.isNull(tabletList) || tabletList.isEmpty()) {
           continue;
         }
-        tablets.add(tablet);
+        tablets.addAll(tabletList);
         enrichedEvents.add((EnrichedEvent) event);
         if (tablets.size() >= limit) {
           break;
         }
       } else if (event instanceof PipeTsFileInsertionEvent) {
-        for (TabletInsertionEvent tabletInsertionEvent :
+        for (final TabletInsertionEvent tabletInsertionEvent :
             ((PipeTsFileInsertionEvent) event).toTabletInsertionEvents()) {
-          Tablet tablet = convertToTablet(tabletInsertionEvent);
-          if (Objects.isNull(tablet)) {
+          final List<Tablet> tabletList = 
convertToTablets(tabletInsertionEvent);
+          if (Objects.isNull(tabletList) || tabletList.isEmpty()) {
             continue;
           }
-          tablets.add(tablet);
+          tablets.addAll(tabletList);
         }
         enrichedEvents.add((EnrichedEvent) event);
         if (tablets.size() >= limit) {
@@ -218,23 +219,24 @@ public class SubscriptionPrefetchingQueue {
     }
   }
 
-  /////////////////////////////// utility ///////////////////////////////
+  /////////////////////////////// Utility ///////////////////////////////
 
-  private Tablet convertToTablet(TabletInsertionEvent tabletInsertionEvent) {
+  private List<Tablet> convertToTablets(TabletInsertionEvent 
tabletInsertionEvent) {
     if (tabletInsertionEvent instanceof PipeInsertNodeTabletInsertionEvent) {
-      return ((PipeInsertNodeTabletInsertionEvent) 
tabletInsertionEvent).convertToTablet();
+      return ((PipeInsertNodeTabletInsertionEvent) 
tabletInsertionEvent).convertToTablets();
     } else if (tabletInsertionEvent instanceof PipeRawTabletInsertionEvent) {
-      return ((PipeRawTabletInsertionEvent) 
tabletInsertionEvent).convertToTablet();
+      return Collections.singletonList(
+          ((PipeRawTabletInsertionEvent) 
tabletInsertionEvent).convertToTablet());
     }
 
     LOGGER.warn(
         "Subscription: Only support convert PipeInsertNodeTabletInsertionEvent 
or PipeRawTabletInsertionEvent to tablet. Ignore {}.",
         tabletInsertionEvent);
-    return null;
+    return Collections.emptyList();
   }
 
   private String generateSubscriptionCommitId() {
-    // subscription commit id format: 
{DataNodeId}#{RebootTimes}#{TopicName}_{BrokerId}#{Id}
+    // Subscription commit id format: 
{DataNodeId}#{RebootTimes}#{TopicName}_{BrokerId}#{Id}
     // Recording data node ID and reboot times to address potential stale 
commit IDs caused by
     // leader transfers or restarts.
     return IoTDBDescriptor.getInstance().getConfig().getDataNodeId()
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeStaticMeta.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeStaticMeta.java
index 731d7e9e3db..718ff85527d 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeStaticMeta.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeStaticMeta.java
@@ -47,11 +47,11 @@ public class PipeStaticMeta {
   }
 
   public PipeStaticMeta(
-      String pipeName,
-      long creationTime,
-      Map<String, String> extractorAttributes,
-      Map<String, String> processorAttributes,
-      Map<String, String> connectorAttributes) {
+      final String pipeName,
+      final long creationTime,
+      final Map<String, String> extractorAttributes,
+      final Map<String, String> processorAttributes,
+      final Map<String, String> connectorAttributes) {
     this.pipeName = pipeName;
     this.creationTime = creationTime;
     extractorParameters = new PipeParameters(extractorAttributes);
@@ -90,28 +90,28 @@ public class PipeStaticMeta {
     return ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0, 
byteArrayOutputStream.size());
   }
 
-  public void serialize(OutputStream outputStream) throws IOException {
+  public void serialize(final OutputStream outputStream) throws IOException {
     ReadWriteIOUtils.write(pipeName, outputStream);
     ReadWriteIOUtils.write(creationTime, outputStream);
 
     ReadWriteIOUtils.write(extractorParameters.getAttribute().size(), 
outputStream);
-    for (Map.Entry<String, String> entry : 
extractorParameters.getAttribute().entrySet()) {
+    for (final Map.Entry<String, String> entry : 
extractorParameters.getAttribute().entrySet()) {
       ReadWriteIOUtils.write(entry.getKey(), outputStream);
       ReadWriteIOUtils.write(entry.getValue(), outputStream);
     }
     ReadWriteIOUtils.write(processorParameters.getAttribute().size(), 
outputStream);
-    for (Map.Entry<String, String> entry : 
processorParameters.getAttribute().entrySet()) {
+    for (final Map.Entry<String, String> entry : 
processorParameters.getAttribute().entrySet()) {
       ReadWriteIOUtils.write(entry.getKey(), outputStream);
       ReadWriteIOUtils.write(entry.getValue(), outputStream);
     }
     ReadWriteIOUtils.write(connectorParameters.getAttribute().size(), 
outputStream);
-    for (Map.Entry<String, String> entry : 
connectorParameters.getAttribute().entrySet()) {
+    for (final Map.Entry<String, String> entry : 
connectorParameters.getAttribute().entrySet()) {
       ReadWriteIOUtils.write(entry.getKey(), outputStream);
       ReadWriteIOUtils.write(entry.getValue(), outputStream);
     }
   }
 
-  public static PipeStaticMeta deserialize(InputStream inputStream) throws 
IOException {
+  public static PipeStaticMeta deserialize(final InputStream inputStream) 
throws IOException {
     final PipeStaticMeta pipeStaticMeta = new PipeStaticMeta();
 
     pipeStaticMeta.pipeName = ReadWriteIOUtils.readString(inputStream);
@@ -143,7 +143,7 @@ public class PipeStaticMeta {
     return pipeStaticMeta;
   }
 
-  public static PipeStaticMeta deserialize(ByteBuffer byteBuffer) {
+  public static PipeStaticMeta deserialize(final ByteBuffer byteBuffer) {
     final PipeStaticMeta pipeStaticMeta = new PipeStaticMeta();
 
     pipeStaticMeta.pipeName = ReadWriteIOUtils.readString(byteBuffer);
@@ -176,14 +176,14 @@ public class PipeStaticMeta {
   }
 
   @Override
-  public boolean equals(Object obj) {
+  public boolean equals(final Object obj) {
     if (this == obj) {
       return true;
     }
     if (obj == null || getClass() != obj.getClass()) {
       return false;
     }
-    PipeStaticMeta that = (PipeStaticMeta) obj;
+    final PipeStaticMeta that = (PipeStaticMeta) obj;
     return pipeName.equals(that.pipeName)
         && creationTime == that.creationTime
         && extractorParameters.equals(that.extractorParameters)
@@ -218,7 +218,8 @@ public class PipeStaticMeta {
   public static final String SYSTEM_PIPE_PREFIX = "__";
   public static final String SUBSCRIPTION_PIPE_PREFIX = SYSTEM_PIPE_PREFIX + 
"subscription.";
 
-  public static String generateSubscriptionPipeName(String topicName, String 
consumerGroupId) {
+  public static String generateSubscriptionPipeName(
+      final String topicName, final String consumerGroupId) {
     return SUBSCRIPTION_PIPE_PREFIX + topicName + "_" + consumerGroupId;
   }
 }

Reply via email to