This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch IOTDB-5973-1.2
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit f415a1d9e7b0b28d02201850b697da16f3f25b82
Author: Steve Yurong Su <[email protected]>
AuthorDate: Wed Jun 7 02:04:24 2023 +0800

    [IOTDB-5973] Pipe: fix ClassCastException when using pipe.core.event.view & 
support collector.pattern in historical collector (#10058)
    
    * remove pipe_connector_session_id in config
    
    * fix ClassCastException when using pipe.core.event.view
    
    * support collector.pattern in historical collector
    
    (cherry picked from commit 397f36edafdab5146f3ea9a7cd943fc3c4262e89)
---
 .../event/dml/insertion/TabletInsertionEvent.java  |  12 +-
 .../apache/iotdb/commons/conf/CommonConfig.java    |   9 -
 .../iotdb/commons/conf/CommonDescriptor.java       |   4 -
 .../iotdb/commons/pipe/config/PipeConfig.java      |   4 -
 .../PipeHistoricalDataRegionTsFileCollector.java   |  19 +-
 .../impl/iotdb/v1/IoTDBThriftConnectorV1.java      |  23 +-
 .../event/impl/PipeEmptyTabletInsertionEvent.java  |  39 --
 .../impl/PipeInsertNodeTabletInsertionEvent.java   |  32 +-
 ...Event.java => PipeRawTabletInsertionEvent.java} |  20 +-
 .../core/event/impl/PipeTsFileInsertionEvent.java  |   7 +-
 .../db/pipe/core/event/view/access/PipeRow.java    |  32 +-
 .../event/view/collector/PipeRowCollector.java     |  23 +-
 .../TabletInsertionDataContainer.java              | 179 +++---
 .../TsFileInsertionDataContainer.java              | 145 +++--
 .../TsFileInsertionDataTabletIterator.java         | 286 +++-------
 .../core/processor/PipeDoNothingProcessor.java     |  14 +-
 ...Test.java => PipeTabletInsertionEventTest.java} |  47 +-
 .../event/TsFileInsertionDataContainerTest.java    | 608 +++++++++++++++++++++
 .../iotdb/tsfile/read/TsFileSequenceReader.java    |  18 +
 19 files changed, 994 insertions(+), 527 deletions(-)

diff --git 
a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/event/dml/insertion/TabletInsertionEvent.java
 
b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/event/dml/insertion/TabletInsertionEvent.java
index 4a8073a5a26..09b129a9cc4 100644
--- 
a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/event/dml/insertion/TabletInsertionEvent.java
+++ 
b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/event/dml/insertion/TabletInsertionEvent.java
@@ -32,16 +32,16 @@ public interface TabletInsertionEvent extends Event {
   /**
    * The consumer processes the data row by row and collects the results by 
RowCollector.
    *
-   * @return TabletInsertionEvent a new TabletInsertionEvent contains the 
results collected by the
-   *     RowCollector
+   * @return Iterable<TabletInsertionEvent> a list of new TabletInsertionEvent 
contains the results
+   *     collected by the RowCollector
    */
-  TabletInsertionEvent processRowByRow(BiConsumer<Row, RowCollector> consumer);
+  Iterable<TabletInsertionEvent> processRowByRow(BiConsumer<Row, RowCollector> 
consumer);
 
   /**
    * The consumer processes the Tablet directly and collects the results by 
RowCollector.
    *
-   * @return TabletInsertionEvent a new TabletInsertionEvent contains the 
results collected by the
-   *     RowCollector
+   * @return Iterable<TabletInsertionEvent> a list of new TabletInsertionEvent 
contains the results
+   *     collected by the RowCollector
    */
-  TabletInsertionEvent processTablet(BiConsumer<Tablet, RowCollector> 
consumer);
+  Iterable<TabletInsertionEvent> processTablet(BiConsumer<Tablet, 
RowCollector> consumer);
 }
diff --git 
a/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java 
b/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
index 4f70504681e..b01128cefcc 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
@@ -156,7 +156,6 @@ public class CommonConfig {
   private int pipeConnectorReadFileBufferSize = 8388608;
   private long pipeConnectorRetryIntervalMs = 1000L;
   private int pipeConnectorPendingQueueSize = 1024;
-  private long pipeConnectorSessionId = Long.MAX_VALUE / 2;
 
   private int pipeHeartbeatLoopCyclesForCollectingPipeMeta = 100;
   private long pipeMetaSyncerInitialSyncDelayMinutes = 3;
@@ -551,12 +550,4 @@ public class CommonConfig {
     this.pipeSubtaskExecutorPendingQueueMaxBlockingTimeMs =
         pipeSubtaskExecutorPendingQueueMaxBlockingTimeMs;
   }
-
-  public long getPipeConnectorSessionId() {
-    return pipeConnectorSessionId;
-  }
-
-  public void setPipeConnectorSessionId(long pipeSessionId) {
-    this.pipeConnectorSessionId = pipeSessionId;
-  }
 }
diff --git 
a/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
 
b/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
index e45c4af45c5..bd13ad3c53b 100644
--- 
a/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
+++ 
b/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
@@ -285,10 +285,6 @@ public class CommonDescriptor {
             properties.getProperty(
                 "pipe_connector_pending_queue_size",
                 String.valueOf(config.getPipeConnectorPendingQueueSize()))));
-    config.setPipeConnectorSessionId(
-        Long.parseLong(
-            properties.getProperty(
-                "pipe_connector_session_id", 
String.valueOf(config.getPipeConnectorSessionId()))));
 
     config.setPipeHeartbeatLoopCyclesForCollectingPipeMeta(
         Integer.parseInt(
diff --git 
a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
 
b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
index af0fcd379ff..365ea0b8003 100644
--- 
a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
+++ 
b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
@@ -85,10 +85,6 @@ public class PipeConfig {
     return COMMON_CONFIG.getPipeConnectorPendingQueueSize();
   }
 
-  public long getPipeConnectorSessionId() {
-    return COMMON_CONFIG.getPipeConnectorSessionId();
-  }
-
   /////////////////////////////// Meta Consistency 
///////////////////////////////
 
   public int getHeartbeatLoopCyclesForCollectingPipeMeta() {
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/historical/PipeHistoricalDataRegionTsFileCollector.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/historical/PipeHistoricalDataRegionTsFileCollector.java
index a1742f9fce3..583cbd5d853 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/historical/PipeHistoricalDataRegionTsFileCollector.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/historical/PipeHistoricalDataRegionTsFileCollector.java
@@ -56,6 +56,7 @@ public class PipeHistoricalDataRegionTsFileCollector extends 
PipeHistoricalDataR
   private final PipeTaskMeta pipeTaskMeta;
   private final ProgressIndex startIndex;
 
+  private String pattern;
   private int dataRegionId;
 
   private final long historicalDataCollectionTimeLowerBound;
@@ -80,6 +81,10 @@ public class PipeHistoricalDataRegionTsFileCollector extends 
PipeHistoricalDataR
   @Override
   public void customize(
       PipeParameters parameters, PipeCollectorRuntimeConfiguration 
configuration) {
+    pattern =
+        parameters.getStringOrDefault(
+            PipeCollectorConstant.COLLECTOR_PATTERN_KEY,
+            PipeCollectorConstant.COLLECTOR_PATTERN_DEFAULT_VALUE);
     dataRegionId = parameters.getInt(DATA_REGION_KEY);
     historicalDataCollectionStartTime =
         parameters.hasAttribute(COLLECTOR_HISTORY_START_TIME)
@@ -153,12 +158,7 @@ public class PipeHistoricalDataRegionTsFileCollector 
extends PipeHistoricalDataR
                         
!startIndex.isAfter(resource.getMaxProgressIndexAfterClose())
                             && 
isTsFileResourceOverlappedWithTimeRange(resource)
                             && 
isTsFileGeneratedAfterCollectionTimeLowerBound(resource))
-                .map(
-                    resource ->
-                        new PipeTsFileInsertionEvent(
-                            resource,
-                            pipeTaskMeta,
-                            
PipeCollectorConstant.COLLECTOR_PATTERN_DEFAULT_VALUE))
+                .map(resource -> new PipeTsFileInsertionEvent(resource, 
pipeTaskMeta, pattern))
                 .collect(Collectors.toList()));
         pendingQueue.addAll(
             tsFileManager.getTsFileList(false).stream()
@@ -167,12 +167,7 @@ public class PipeHistoricalDataRegionTsFileCollector 
extends PipeHistoricalDataR
                         
!startIndex.isAfter(resource.getMaxProgressIndexAfterClose())
                             && 
isTsFileResourceOverlappedWithTimeRange(resource)
                             && 
isTsFileGeneratedAfterCollectionTimeLowerBound(resource))
-                .map(
-                    resource ->
-                        new PipeTsFileInsertionEvent(
-                            resource,
-                            pipeTaskMeta,
-                            
PipeCollectorConstant.COLLECTOR_PATTERN_DEFAULT_VALUE))
+                .map(resource -> new PipeTsFileInsertionEvent(resource, 
pipeTaskMeta, pattern))
                 .collect(Collectors.toList()));
         pendingQueue.forEach(
             event ->
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/impl/iotdb/v1/IoTDBThriftConnectorV1.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/impl/iotdb/v1/IoTDBThriftConnectorV1.java
index c67e3966cc2..4c0b1c0e54e 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/impl/iotdb/v1/IoTDBThriftConnectorV1.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/impl/iotdb/v1/IoTDBThriftConnectorV1.java
@@ -33,9 +33,8 @@ import 
org.apache.iotdb.db.pipe.core.connector.impl.iotdb.v1.request.PipeTransfe
 import 
org.apache.iotdb.db.pipe.core.connector.impl.iotdb.v1.request.PipeTransferHandshakeReq;
 import 
org.apache.iotdb.db.pipe.core.connector.impl.iotdb.v1.request.PipeTransferInsertNodeReq;
 import 
org.apache.iotdb.db.pipe.core.connector.impl.iotdb.v1.request.PipeTransferTabletReq;
-import org.apache.iotdb.db.pipe.core.event.impl.PipeEmptyTabletInsertionEvent;
 import 
org.apache.iotdb.db.pipe.core.event.impl.PipeInsertNodeTabletInsertionEvent;
-import org.apache.iotdb.db.pipe.core.event.impl.PipeTabletTabletInsertionEvent;
+import org.apache.iotdb.db.pipe.core.event.impl.PipeRawTabletInsertionEvent;
 import org.apache.iotdb.db.pipe.core.event.impl.PipeTsFileInsertionEvent;
 import org.apache.iotdb.db.wal.exception.WALPipeException;
 import org.apache.iotdb.pipe.api.PipeConnector;
@@ -120,13 +119,11 @@ public class IoTDBThriftConnectorV1 implements 
PipeConnector {
     try {
       if (tabletInsertionEvent instanceof PipeInsertNodeTabletInsertionEvent) {
         doTransfer((PipeInsertNodeTabletInsertionEvent) tabletInsertionEvent);
-      } else if (tabletInsertionEvent instanceof 
PipeTabletTabletInsertionEvent) {
-        doTransfer((PipeTabletTabletInsertionEvent) tabletInsertionEvent);
-      } else if (tabletInsertionEvent instanceof 
PipeEmptyTabletInsertionEvent) {
-        doTransfer((PipeEmptyTabletInsertionEvent) tabletInsertionEvent);
+      } else if (tabletInsertionEvent instanceof PipeRawTabletInsertionEvent) {
+        doTransfer((PipeRawTabletInsertionEvent) tabletInsertionEvent);
       } else {
         throw new NotImplementedException(
-            "IoTDBThriftConnectorV1 only support 
PipeInsertNodeTabletInsertionEvent and PipeTabletTabletInsertionEvent.");
+            "IoTDBThriftConnectorV1 only support 
PipeInsertNodeTabletInsertionEvent and PipeRawTabletInsertionEvent.");
       }
     } catch (TException e) {
       LOGGER.error(
@@ -154,25 +151,21 @@ public class IoTDBThriftConnectorV1 implements 
PipeConnector {
     }
   }
 
-  private void doTransfer(PipeTabletTabletInsertionEvent 
pipeTabletTabletInsertionEvent)
+  private void doTransfer(PipeRawTabletInsertionEvent 
pipeRawTabletInsertionEvent)
       throws PipeException, TException, IOException {
     final TPipeTransferResp resp =
         client.pipeTransfer(
             PipeTransferTabletReq.toTPipeTransferReq(
-                pipeTabletTabletInsertionEvent.convertToTablet()));
+                pipeRawTabletInsertionEvent.convertToTablet()));
 
     if (resp.getStatus().getCode() != 
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
       throw new PipeException(
           String.format(
-              "Transfer PipeTabletTabletInsertionEvent %s error, result status 
%s",
-              pipeTabletTabletInsertionEvent, resp.status));
+              "Transfer PipeRawTabletInsertionEvent %s error, result status 
%s",
+              pipeRawTabletInsertionEvent, resp.status));
     }
   }
 
-  private void doTransfer(PipeEmptyTabletInsertionEvent 
pipeEmptyTabletInsertionEvent) {
-    // do nothing
-  }
-
   @Override
   public void transfer(TsFileInsertionEvent tsFileInsertionEvent) throws 
Exception {
     // PipeProcessor can change the type of TabletInsertionEvent
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/impl/PipeEmptyTabletInsertionEvent.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/impl/PipeEmptyTabletInsertionEvent.java
deleted file mode 100644
index 855da8fa8b0..00000000000
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/impl/PipeEmptyTabletInsertionEvent.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.db.pipe.core.event.impl;
-
-import org.apache.iotdb.pipe.api.access.Row;
-import org.apache.iotdb.pipe.api.collector.RowCollector;
-import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
-import org.apache.iotdb.tsfile.write.record.Tablet;
-
-import java.util.function.BiConsumer;
-
-public class PipeEmptyTabletInsertionEvent implements TabletInsertionEvent {
-  @Override
-  public TabletInsertionEvent processRowByRow(BiConsumer<Row, RowCollector> 
consumer) {
-    return this;
-  }
-
-  @Override
-  public TabletInsertionEvent processTablet(BiConsumer<Tablet, RowCollector> 
consumer) {
-    return this;
-  }
-}
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/impl/PipeInsertNodeTabletInsertionEvent.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/impl/PipeInsertNodeTabletInsertionEvent.java
index 416fea8b734..dd65ad4c319 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/impl/PipeInsertNodeTabletInsertionEvent.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/impl/PipeInsertNodeTabletInsertionEvent.java
@@ -21,7 +21,6 @@ package org.apache.iotdb.db.pipe.core.event.impl;
 
 import org.apache.iotdb.commons.consensus.index.ProgressIndex;
 import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
-import org.apache.iotdb.commons.utils.TestOnly;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertNode;
 import org.apache.iotdb.db.pipe.core.event.EnrichedEvent;
 import 
org.apache.iotdb.db.pipe.core.event.view.datastructure.TabletInsertionDataContainer;
@@ -116,7 +115,7 @@ public class PipeInsertNodeTabletInsertionEvent extends 
EnrichedEvent
   /////////////////////////// TabletInsertionEvent ///////////////////////////
 
   @Override
-  public TabletInsertionEvent processRowByRow(BiConsumer<Row, RowCollector> 
consumer) {
+  public Iterable<TabletInsertionEvent> processRowByRow(BiConsumer<Row, 
RowCollector> consumer) {
     try {
       if (dataContainer == null) {
         dataContainer = new TabletInsertionDataContainer(getInsertNode(), 
getPattern());
@@ -129,7 +128,7 @@ public class PipeInsertNodeTabletInsertionEvent extends 
EnrichedEvent
   }
 
   @Override
-  public TabletInsertionEvent processTablet(BiConsumer<Tablet, RowCollector> 
consumer) {
+  public Iterable<TabletInsertionEvent> processTablet(BiConsumer<Tablet, 
RowCollector> consumer) {
     try {
       if (dataContainer == null) {
         dataContainer = new TabletInsertionDataContainer(getInsertNode(), 
getPattern());
@@ -141,36 +140,11 @@ public class PipeInsertNodeTabletInsertionEvent extends 
EnrichedEvent
     }
   }
 
-  public Tablet convertToTablet() {
-    try {
-      if (dataContainer == null) {
-        dataContainer = new TabletInsertionDataContainer(getInsertNode(), 
getPattern());
-      }
-      return dataContainer.convertToTablet();
-    } catch (Exception e) {
-      LOGGER.error("Process tablet error.", e);
-      throw new PipeException("Process tablet error.", e);
-    }
-  }
-
-  @TestOnly
-  public Tablet convertToTabletForTest(InsertNode insertNode, String pattern) {
-    try {
-      if (dataContainer == null) {
-        dataContainer = new TabletInsertionDataContainer(insertNode, pattern);
-      }
-      return dataContainer.convertToTablet();
-    } catch (Exception e) {
-      LOGGER.error("Process tablet error.", e);
-      throw new PipeException("Process tablet error.", e);
-    }
-  }
-
   /////////////////////////// Object ///////////////////////////
 
   @Override
   public String toString() {
-    return "PipeTabletTabletInsertionEvent{"
+    return "PipeRawTabletInsertionEvent{"
         + "walEntryHandler="
         + walEntryHandler
         + ", progressIndex="
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/impl/PipeTabletTabletInsertionEvent.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/impl/PipeRawTabletInsertionEvent.java
similarity index 74%
rename from 
server/src/main/java/org/apache/iotdb/db/pipe/core/event/impl/PipeTabletTabletInsertionEvent.java
rename to 
server/src/main/java/org/apache/iotdb/db/pipe/core/event/impl/PipeRawTabletInsertionEvent.java
index 014972abd7f..abb33534eb8 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/impl/PipeTabletTabletInsertionEvent.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/impl/PipeRawTabletInsertionEvent.java
@@ -29,18 +29,18 @@ import org.apache.iotdb.tsfile.write.record.Tablet;
 import java.util.Objects;
 import java.util.function.BiConsumer;
 
-public class PipeTabletTabletInsertionEvent implements TabletInsertionEvent {
+public class PipeRawTabletInsertionEvent implements TabletInsertionEvent {
 
   private final Tablet tablet;
   private final String pattern;
 
   private TabletInsertionDataContainer dataContainer;
 
-  public PipeTabletTabletInsertionEvent(Tablet tablet) {
+  public PipeRawTabletInsertionEvent(Tablet tablet) {
     this(Objects.requireNonNull(tablet), null);
   }
 
-  public PipeTabletTabletInsertionEvent(Tablet tablet, String pattern) {
+  public PipeRawTabletInsertionEvent(Tablet tablet, String pattern) {
     this.tablet = Objects.requireNonNull(tablet);
     this.pattern = pattern;
   }
@@ -52,7 +52,7 @@ public class PipeTabletTabletInsertionEvent implements 
TabletInsertionEvent {
   /////////////////////////// TabletInsertionEvent ///////////////////////////
 
   @Override
-  public TabletInsertionEvent processRowByRow(BiConsumer<Row, RowCollector> 
consumer) {
+  public Iterable<TabletInsertionEvent> processRowByRow(BiConsumer<Row, 
RowCollector> consumer) {
     if (dataContainer == null) {
       dataContainer = new TabletInsertionDataContainer(tablet, getPattern());
     }
@@ -60,7 +60,7 @@ public class PipeTabletTabletInsertionEvent implements 
TabletInsertionEvent {
   }
 
   @Override
-  public TabletInsertionEvent processTablet(BiConsumer<Tablet, RowCollector> 
consumer) {
+  public Iterable<TabletInsertionEvent> processTablet(BiConsumer<Tablet, 
RowCollector> consumer) {
     if (dataContainer == null) {
       dataContainer = new TabletInsertionDataContainer(tablet, getPattern());
     }
@@ -68,8 +68,16 @@ public class PipeTabletTabletInsertionEvent implements 
TabletInsertionEvent {
   }
 
   public Tablet convertToTablet() {
+    final String pattern = getPattern();
+
+    // if pattern is "root", we don't need to convert, just return the 
original tablet
+    if (pattern.equals(PipeCollectorConstant.COLLECTOR_PATTERN_DEFAULT_VALUE)) 
{
+      return tablet;
+    }
+
+    // if pattern is not "root", we need to convert the tablet
     if (dataContainer == null) {
-      dataContainer = new TabletInsertionDataContainer(tablet, getPattern());
+      dataContainer = new TabletInsertionDataContainer(tablet, pattern);
     }
     return dataContainer.convertToTablet();
   }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/impl/PipeTsFileInsertionEvent.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/impl/PipeTsFileInsertionEvent.java
index c813619be9a..2de0cd31d19 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/impl/PipeTsFileInsertionEvent.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/impl/PipeTsFileInsertionEvent.java
@@ -35,6 +35,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.File;
+import java.io.IOException;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 public class PipeTsFileInsertionEvent extends EnrichedEvent implements 
TsFileInsertionEvent {
@@ -152,12 +153,16 @@ public class PipeTsFileInsertionEvent extends 
EnrichedEvent implements TsFileIns
       }
       return dataContainer.toTabletInsertionEvents();
     } catch (InterruptedException e) {
-      String errorMsg =
+      final String errorMsg =
           String.format(
               "Interrupted when waiting for closing TsFile %s.", 
resource.getTsFilePath());
       LOGGER.warn(errorMsg);
       Thread.currentThread().interrupt();
       throw new PipeException(errorMsg);
+    } catch (IOException e) {
+      final String errorMsg = String.format("Read TsFile %s error.", 
resource.getTsFilePath());
+      LOGGER.warn(errorMsg);
+      throw new PipeException(errorMsg);
     }
   }
 
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/access/PipeRow.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/access/PipeRow.java
index dfab1e1aa2b..85f4a210ccb 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/access/PipeRow.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/access/PipeRow.java
@@ -26,6 +26,7 @@ import org.apache.iotdb.pipe.api.type.Binary;
 import org.apache.iotdb.pipe.api.type.Type;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.common.Path;
+import org.apache.iotdb.tsfile.utils.BitMap;
 import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
 
 import java.util.Arrays;
@@ -39,8 +40,9 @@ public class PipeRow implements Row {
   private final MeasurementSchema[] measurementSchemaList;
 
   private final long[] timestampColumn;
-  private final Object[] valueColumns;
   private final TSDataType[] valueColumnTypes;
+  private final Object[] valueColumns;
+  private final BitMap[] bitMaps;
 
   private final String[] columnNameStringList;
 
@@ -49,15 +51,17 @@ public class PipeRow implements Row {
       String deviceId,
       MeasurementSchema[] measurementSchemaList,
       long[] timestampColumn,
-      Object[] valueColumns,
       TSDataType[] valueColumnTypes,
+      Object[] valueColumns,
+      BitMap[] bitMaps,
       String[] columnNameStringList) {
     this.rowIndex = rowIndex;
     this.deviceId = deviceId;
     this.measurementSchemaList = measurementSchemaList;
     this.timestampColumn = timestampColumn;
-    this.valueColumns = valueColumns;
     this.valueColumnTypes = valueColumnTypes;
+    this.valueColumns = valueColumns;
+    this.bitMaps = bitMaps;
     this.columnNameStringList = columnNameStringList;
   }
 
@@ -103,7 +107,25 @@ public class PipeRow implements Row {
 
   @Override
   public Object getObject(int columnIndex) {
-    return ((Object[]) valueColumns[columnIndex])[rowIndex];
+    switch (getDataType(columnIndex)) {
+      case INT32:
+        return getInt(columnIndex);
+      case INT64:
+        return getLong(columnIndex);
+      case FLOAT:
+        return getFloat(columnIndex);
+      case DOUBLE:
+        return getDouble(columnIndex);
+      case BOOLEAN:
+        return getBoolean(columnIndex);
+      case TEXT:
+        return getBinary(columnIndex);
+      default:
+        throw new UnsupportedOperationException(
+            String.format(
+                "unsupported data type %s for column %s",
+                getDataType(columnIndex), columnNameStringList[columnIndex]));
+    }
   }
 
   @Override
@@ -113,7 +135,7 @@ public class PipeRow implements Row {
 
   @Override
   public boolean isNull(int columnIndex) {
-    return ((Object[]) valueColumns[columnIndex])[rowIndex] == null;
+    return bitMaps[columnIndex].isMarked(rowIndex);
   }
 
   @Override
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/collector/PipeRowCollector.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/collector/PipeRowCollector.java
index ab0371252f3..8010b67e57e 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/collector/PipeRowCollector.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/collector/PipeRowCollector.java
@@ -19,8 +19,7 @@
 
 package org.apache.iotdb.db.pipe.core.event.view.collector;
 
-import org.apache.iotdb.db.pipe.core.event.impl.PipeEmptyTabletInsertionEvent;
-import org.apache.iotdb.db.pipe.core.event.impl.PipeTabletTabletInsertionEvent;
+import org.apache.iotdb.db.pipe.core.event.impl.PipeRawTabletInsertionEvent;
 import org.apache.iotdb.db.pipe.core.event.view.access.PipeRow;
 import org.apache.iotdb.pipe.api.access.Row;
 import org.apache.iotdb.pipe.api.collector.RowCollector;
@@ -35,6 +34,7 @@ import java.util.List;
 
 public class PipeRowCollector implements RowCollector {
 
+  private final List<TabletInsertionEvent> tabletInsertionEventList = new 
ArrayList<>();
   private Tablet tablet = null;
 
   @Override
@@ -63,16 +63,21 @@ public class PipeRowCollector implements RowCollector {
       }
     }
     tablet.rowSize++;
-  }
 
-  public TabletInsertionEvent toTabletInsertionEvent() {
-    if (tablet == null) {
-      return new PipeEmptyTabletInsertionEvent();
+    if (tablet.rowSize == tablet.getMaxRowNumber()) {
+      collectTabletInsertionEvent();
     }
+  }
 
-    PipeTabletTabletInsertionEvent tabletInsertionEvent =
-        new PipeTabletTabletInsertionEvent(tablet);
+  private void collectTabletInsertionEvent() {
+    if (tablet != null) {
+      tabletInsertionEventList.add(new PipeRawTabletInsertionEvent(tablet));
+    }
     this.tablet = null;
-    return tabletInsertionEvent;
+  }
+
+  public Iterable<TabletInsertionEvent> convertToTabletInsertionEvents() {
+    collectTabletInsertionEvent();
+    return tabletInsertionEventList;
   }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/datastructure/TabletInsertionDataContainer.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/datastructure/TabletInsertionDataContainer.java
index 16d21f73138..542847074c0 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/datastructure/TabletInsertionDataContainer.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/datastructure/TabletInsertionDataContainer.java
@@ -23,7 +23,6 @@ import 
org.apache.iotdb.commons.exception.IllegalPathException;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertRowNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertTabletNode;
-import org.apache.iotdb.db.pipe.core.event.impl.PipeEmptyTabletInsertionEvent;
 import org.apache.iotdb.db.pipe.core.event.view.access.PipeRow;
 import org.apache.iotdb.db.pipe.core.event.view.collector.PipeRowCollector;
 import org.apache.iotdb.pipe.api.access.Row;
@@ -40,6 +39,7 @@ import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
 
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
 import java.util.Objects;
 import java.util.function.BiConsumer;
@@ -52,8 +52,9 @@ public class TabletInsertionDataContainer {
   private String[] columnNameStringList;
 
   private long[] timestampColumn;
-  private Object[] valueColumns;
   private TSDataType[] valueColumnTypes;
+  // each column of Object[] is a column of primitive type array
+  private Object[] valueColumns;
   private BitMap[] nullValueColumnBitmaps;
   private int rowCount;
 
@@ -89,7 +90,8 @@ public class TabletInsertionDataContainer {
     this.timestampColumn = new long[] {insertRowNode.getTime()};
 
     generateColumnIndexMapper(
-        insertRowNode, pattern, 
originColumnIndex2FilteredColumnIndexMapperList);
+        insertRowNode.getMeasurements(), pattern, 
originColumnIndex2FilteredColumnIndexMapperList);
+
     final int filteredColumnSize =
         Arrays.stream(originColumnIndex2FilteredColumnIndexMapperList)
             .filter(Objects::nonNull)
@@ -98,22 +100,46 @@ public class TabletInsertionDataContainer {
 
     this.measurementSchemaList = new MeasurementSchema[filteredColumnSize];
     this.columnNameStringList = new String[filteredColumnSize];
-    this.valueColumns = new Object[filteredColumnSize];
     this.valueColumnTypes = new TSDataType[filteredColumnSize];
+    this.valueColumns = new Object[filteredColumnSize];
     this.nullValueColumnBitmaps = new BitMap[filteredColumnSize];
 
     final MeasurementSchema[] originMeasurementSchemaList = 
insertRowNode.getMeasurementSchemas();
     final String[] originColumnNameStringList = 
insertRowNode.getMeasurements();
-    final Object[] originValueColumns = insertRowNode.getValues();
     final TSDataType[] originValueColumnTypes = insertRowNode.getDataTypes();
+    final Object[] originValueColumns = insertRowNode.getValues();
 
     for (int i = 0; i < 
originColumnIndex2FilteredColumnIndexMapperList.length; i++) {
       if (originColumnIndex2FilteredColumnIndexMapperList[i] != null) {
         final int filteredColumnIndex = 
originColumnIndex2FilteredColumnIndexMapperList[i];
         this.measurementSchemaList[filteredColumnIndex] = 
originMeasurementSchemaList[i];
         this.columnNameStringList[filteredColumnIndex] = 
originColumnNameStringList[i];
-        this.valueColumns[filteredColumnIndex] = originValueColumns[i];
         this.valueColumnTypes[filteredColumnIndex] = originValueColumnTypes[i];
+        switch (originValueColumnTypes[i]) {
+          case INT32:
+            this.valueColumns[filteredColumnIndex] = new int[] {(Integer) 
originValueColumns[i]};
+            break;
+          case INT64:
+            this.valueColumns[filteredColumnIndex] = new long[] {(Long) 
originValueColumns[i]};
+            break;
+          case FLOAT:
+            this.valueColumns[filteredColumnIndex] = new float[] {(Float) 
originValueColumns[i]};
+            break;
+          case DOUBLE:
+            this.valueColumns[filteredColumnIndex] = new double[] {(Double) 
originValueColumns[i]};
+            break;
+          case BOOLEAN:
+            this.valueColumns[filteredColumnIndex] =
+                new boolean[] {(Boolean) originValueColumns[i]};
+            break;
+          case TEXT:
+            this.valueColumns[filteredColumnIndex] = new Binary[] {(Binary) 
originValueColumns[i]};
+            break;
+          default:
+            throw new UnSupportedDataTypeException(
+                String.format(
+                    "Data type %s is not supported.", 
originValueColumnTypes[i].toString()));
+        }
         this.nullValueColumnBitmaps[filteredColumnIndex] = new BitMap(1);
       }
     }
@@ -130,7 +156,9 @@ public class TabletInsertionDataContainer {
     this.timestampColumn = insertTabletNode.getTimes();
 
     generateColumnIndexMapper(
-        insertTabletNode, pattern, 
originColumnIndex2FilteredColumnIndexMapperList);
+        insertTabletNode.getMeasurements(),
+        pattern,
+        originColumnIndex2FilteredColumnIndexMapperList);
 
     final int filteredColumnSize =
         Arrays.stream(originColumnIndex2FilteredColumnIndexMapperList)
@@ -140,15 +168,15 @@ public class TabletInsertionDataContainer {
 
     this.measurementSchemaList = new MeasurementSchema[filteredColumnSize];
     this.columnNameStringList = new String[filteredColumnSize];
-    this.valueColumns = new Object[filteredColumnSize];
     this.valueColumnTypes = new TSDataType[filteredColumnSize];
+    this.valueColumns = new Object[filteredColumnSize];
     this.nullValueColumnBitmaps = new BitMap[filteredColumnSize];
 
     final MeasurementSchema[] originMeasurementSchemaList =
         insertTabletNode.getMeasurementSchemas();
     final String[] originColumnNameStringList = 
insertTabletNode.getMeasurements();
-    final Object[] originValueColumns = insertTabletNode.getColumns();
     final TSDataType[] originValueColumnTypes = 
insertTabletNode.getDataTypes();
+    final Object[] originValueColumns = insertTabletNode.getColumns();
     final BitMap[] originBitMapList =
         (insertTabletNode.getBitMaps() == null
             ? IntStream.range(0, originColumnSize)
@@ -156,15 +184,19 @@ public class TabletInsertionDataContainer {
                 .map(o -> new BitMap(timestampColumn.length))
                 .toArray(BitMap[]::new)
             : insertTabletNode.getBitMaps());
+    for (int i = 0; i < originBitMapList.length; i++) {
+      if (originBitMapList[i] == null) {
+        originBitMapList[i] = new BitMap(timestampColumn.length);
+      }
+    }
 
     for (int i = 0; i < 
originColumnIndex2FilteredColumnIndexMapperList.length; i++) {
       if (originColumnIndex2FilteredColumnIndexMapperList[i] != null) {
         final int filteredColumnIndex = 
originColumnIndex2FilteredColumnIndexMapperList[i];
         this.measurementSchemaList[filteredColumnIndex] = 
originMeasurementSchemaList[i];
         this.columnNameStringList[filteredColumnIndex] = 
originColumnNameStringList[i];
-        this.valueColumns[filteredColumnIndex] =
-            convertToColumn(originValueColumns[i], originValueColumnTypes[i], 
originBitMapList[i]);
         this.valueColumnTypes[filteredColumnIndex] = originValueColumnTypes[i];
+        this.valueColumns[filteredColumnIndex] = originValueColumns[i];
         this.nullValueColumnBitmaps[filteredColumnIndex] = originBitMapList[i];
       }
     }
@@ -179,7 +211,13 @@ public class TabletInsertionDataContainer {
     this.deviceId = tablet.deviceId;
     this.timestampColumn = tablet.timestamps;
 
-    generateColumnIndexMapper(tablet, pattern, 
originColumnIndex2FilteredColumnIndexMapperList);
+    final List<MeasurementSchema> originMeasurementSchemaList = 
tablet.getSchemas();
+    final String[] originMeasurementList = new 
String[originMeasurementSchemaList.size()];
+    for (int i = 0; i < originMeasurementSchemaList.size(); i++) {
+      originMeasurementList[i] = 
originMeasurementSchemaList.get(i).getMeasurementId();
+    }
+    generateColumnIndexMapper(
+        originMeasurementList, pattern, 
originColumnIndex2FilteredColumnIndexMapperList);
 
     final int filteredColumnSize =
         Arrays.stream(originColumnIndex2FilteredColumnIndexMapperList)
@@ -189,11 +227,10 @@ public class TabletInsertionDataContainer {
 
     this.measurementSchemaList = new MeasurementSchema[filteredColumnSize];
     this.columnNameStringList = new String[filteredColumnSize];
-    this.valueColumns = new Object[filteredColumnSize];
     this.valueColumnTypes = new TSDataType[filteredColumnSize];
+    this.valueColumns = new Object[filteredColumnSize];
     this.nullValueColumnBitmaps = new BitMap[filteredColumnSize];
 
-    final List<MeasurementSchema> originMeasurementSchemaList = 
tablet.getSchemas();
     final String[] originColumnNameStringList = new String[originColumnSize];
     final TSDataType[] originValueColumnTypes = new 
TSDataType[originColumnSize];
     for (int i = 0; i < originColumnSize; i++) {
@@ -201,16 +238,26 @@ public class TabletInsertionDataContainer {
       originValueColumnTypes[i] = originMeasurementSchemaList.get(i).getType();
     }
     final Object[] originValueColumns = tablet.values;
-    final BitMap[] originBitMapList = tablet.bitMaps;
+    final BitMap[] originBitMapList =
+        tablet.bitMaps == null
+            ? IntStream.range(0, originColumnSize)
+                .boxed()
+                .map(o -> new BitMap(timestampColumn.length))
+                .toArray(BitMap[]::new)
+            : tablet.bitMaps;
+    for (int i = 0; i < originBitMapList.length; i++) {
+      if (originBitMapList[i] == null) {
+        originBitMapList[i] = new BitMap(timestampColumn.length);
+      }
+    }
 
     for (int i = 0; i < 
originColumnIndex2FilteredColumnIndexMapperList.length; i++) {
       if (originColumnIndex2FilteredColumnIndexMapperList[i] != null) {
         final int filteredColumnIndex = 
originColumnIndex2FilteredColumnIndexMapperList[i];
         this.measurementSchemaList[filteredColumnIndex] = 
originMeasurementSchemaList.get(i);
         this.columnNameStringList[filteredColumnIndex] = 
originColumnNameStringList[i];
-        this.valueColumns[filteredColumnIndex] =
-            convertToColumn(originValueColumns[i], originValueColumnTypes[i], 
originBitMapList[i]);
         this.valueColumnTypes[filteredColumnIndex] = originValueColumnTypes[i];
+        this.valueColumns[filteredColumnIndex] = originValueColumns[i];
         this.nullValueColumnBitmaps[filteredColumnIndex] = originBitMapList[i];
       }
     }
@@ -218,6 +265,7 @@ public class TabletInsertionDataContainer {
     rowCount = tablet.rowSize;
   }
 
+  // TODO: cache the result keyed by deviceId to improve performance
   private void generateColumnIndexMapper(
       String[] originMeasurementList,
       String pattern,
@@ -243,7 +291,6 @@ public class TabletInsertionDataContainer {
         // low cost check comes first
         if (pattern.length() == deviceId.length() + measurement.length() + 1
             // high cost check comes later
-            && pattern.startsWith(deviceId)
             && pattern.endsWith(TsFileConstant.PATH_SEPARATOR + measurement)) {
           originColumnIndex2FilteredColumnIndexMapperList[i] = filteredCount++;
         }
@@ -251,107 +298,34 @@ public class TabletInsertionDataContainer {
     }
   }
 
-  private void generateColumnIndexMapper(
-      InsertNode insertNode,
-      String pattern,
-      Integer[] originColumnIndex2FilteredColumnIndexMapperList) {
-    generateColumnIndexMapper(
-        insertNode.getMeasurements(), pattern, 
originColumnIndex2FilteredColumnIndexMapperList);
-  }
-
-  private void generateColumnIndexMapper(
-      Tablet tablet, String pattern, Integer[] 
originColumnIndex2FilteredColumnIndexMapperList) {
-    final List<MeasurementSchema> originMeasurementSchemaList = 
tablet.getSchemas();
-    final String[] originMeasurementList = new 
String[originMeasurementSchemaList.size()];
-    for (int i = 0; i < originMeasurementSchemaList.size(); i++) {
-      originMeasurementList[i] = 
originMeasurementSchemaList.get(i).getMeasurementId();
-    }
-    generateColumnIndexMapper(
-        originMeasurementList, pattern, 
originColumnIndex2FilteredColumnIndexMapperList);
-  }
-
-  private Object convertToColumn(Object originColumn, TSDataType dataType, 
BitMap bitMap) {
-    switch (dataType) {
-      case INT32:
-        final int[] intValues = (int[]) originColumn;
-        final int[] integerValues = new int[intValues.length];
-        for (int i = 0; i < intValues.length; i++) {
-          integerValues[i] = bitMap != null && bitMap.isMarked(i) ? 0 : 
intValues[i];
-        }
-        return integerValues;
-      case INT64:
-        final long[] longValues = (long[]) originColumn;
-        final long[] longValues2 = new long[longValues.length];
-        for (int i = 0; i < longValues.length; i++) {
-          longValues2[i] = bitMap != null && bitMap.isMarked(i) ? 0 : 
longValues[i];
-        }
-        return longValues2;
-      case FLOAT:
-        final float[] floatValues = (float[]) originColumn;
-        final float[] floatValues2 = new float[floatValues.length];
-        for (int i = 0; i < floatValues.length; i++) {
-          floatValues2[i] = bitMap != null && bitMap.isMarked(i) ? 0 : 
floatValues[i];
-        }
-        return floatValues2;
-      case DOUBLE:
-        final double[] doubleValues = (double[]) originColumn;
-        final double[] doubleValues2 = new double[doubleValues.length];
-        for (int i = 0; i < doubleValues.length; i++) {
-          doubleValues2[i] = bitMap != null && bitMap.isMarked(i) ? 0 : 
doubleValues[i];
-        }
-        return doubleValues2;
-      case BOOLEAN:
-        final boolean[] booleanValues = (boolean[]) originColumn;
-        final boolean[] booleanValues2 = new boolean[booleanValues.length];
-        for (int i = 0; i < booleanValues.length; i++) {
-          booleanValues2[i] = (bitMap == null || !bitMap.isMarked(i)) && 
booleanValues[i];
-        }
-        return booleanValues2;
-      case TEXT:
-        final Binary[] binaryValues = (Binary[]) originColumn;
-        final Binary[] stringValues = new Binary[binaryValues.length];
-        for (int i = 0; i < binaryValues.length; i++) {
-          stringValues[i] =
-              bitMap != null && bitMap.isMarked(i)
-                  ? null
-                  : (binaryValues[i] == null
-                      ? null
-                      : Binary.valueOf(binaryValues[i].getStringValue()));
-        }
-        return stringValues;
-      default:
-        throw new UnSupportedDataTypeException(
-            String.format("Data type %s is not supported.", dataType));
-    }
-  }
-
   ////////////////////////////  process  ////////////////////////////
 
-  public TabletInsertionEvent processRowByRow(BiConsumer<Row, RowCollector> 
consumer) {
-    final PipeRowCollector rowCollector = new PipeRowCollector();
-    if (valueColumns.length == 0) {
-      return new PipeEmptyTabletInsertionEvent();
+  public Iterable<TabletInsertionEvent> processRowByRow(BiConsumer<Row, 
RowCollector> consumer) {
+    if (valueColumns.length == 0 || timestampColumn.length == 0) {
+      return Collections.emptyList();
     }
 
-    for (int i = 0; i < timestampColumn.length; i++) {
+    final PipeRowCollector rowCollector = new PipeRowCollector();
+    for (int i = 0; i < rowCount; i++) {
       consumer.accept(
           new PipeRow(
               i,
               deviceId,
               measurementSchemaList,
               timestampColumn,
-              valueColumns,
               valueColumnTypes,
+              valueColumns,
+              nullValueColumnBitmaps,
               columnNameStringList),
           rowCollector);
     }
-    return rowCollector.toTabletInsertionEvent();
+    return rowCollector.convertToTabletInsertionEvents();
   }
 
-  public TabletInsertionEvent processTablet(BiConsumer<Tablet, RowCollector> 
consumer) {
+  public Iterable<TabletInsertionEvent> processTablet(BiConsumer<Tablet, 
RowCollector> consumer) {
     final PipeRowCollector rowCollector = new PipeRowCollector();
     consumer.accept(convertToTablet(), rowCollector);
-    return rowCollector.toTabletInsertionEvent();
+    return rowCollector.convertToTabletInsertionEvents();
   }
 
   ////////////////////////////  convert  ////////////////////////////
@@ -362,10 +336,8 @@ public class TabletInsertionDataContainer {
     }
 
     final int columnSize = measurementSchemaList.length;
-
     final List<MeasurementSchema> measurementSchemaArrayList =
         new ArrayList<>(Arrays.asList(measurementSchemaList).subList(0, 
columnSize));
-
     final Tablet newTablet = new Tablet(deviceId, measurementSchemaArrayList, 
rowCount);
     newTablet.timestamps = timestampColumn;
     newTablet.bitMaps = nullValueColumnBitmaps;
@@ -373,6 +345,7 @@ public class TabletInsertionDataContainer {
     newTablet.rowSize = rowCount;
 
     tablet = newTablet;
+
     return tablet;
   }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/datastructure/TsFileInsertionDataContainer.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/datastructure/TsFileInsertionDataContainer.java
index 9035a8fe0bb..79274e48fc7 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/datastructure/TsFileInsertionDataContainer.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/datastructure/TsFileInsertionDataContainer.java
@@ -19,13 +19,13 @@
 
 package org.apache.iotdb.db.pipe.core.event.view.datastructure;
 
-import org.apache.iotdb.db.pipe.core.event.impl.PipeTabletTabletInsertionEvent;
+import org.apache.iotdb.db.pipe.core.event.impl.PipeRawTabletInsertionEvent;
 import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
+import org.apache.iotdb.pipe.api.exception.PipeException;
 import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
-import org.apache.iotdb.tsfile.file.metadata.TimeseriesMetadata;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.TsFileReader;
 import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
-import org.apache.iotdb.tsfile.write.record.Tablet;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -37,100 +37,127 @@ import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.NoSuchElementException;
 
-public class TsFileInsertionDataContainer {
+public class TsFileInsertionDataContainer implements AutoCloseable {
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(TsFileInsertionDataContainer.class);
 
-  private final File tsFile;
   private final String pattern;
 
-  private TimeseriesMetadata vectorTimeseriesMetadata;
+  private final TsFileSequenceReader tsFileSequenceReader;
+  private final TsFileReader tsFileReader;
 
-  private final Map<String, List<TimeseriesMetadata>> 
device2TimeseriesMetadataMap;
+  private final Iterator<Map.Entry<String, List<String>>> 
deviceMeasurementsMapIterator;
+  private final Map<String, TSDataType> measurementDataTypeMap;
 
-  public TsFileInsertionDataContainer(File tsFile, String pattern) {
-    this.tsFile = tsFile;
+  public TsFileInsertionDataContainer(File tsFile, String pattern) throws 
IOException {
     this.pattern = pattern;
 
-    this.device2TimeseriesMetadataMap = collectDevice2TimeseriesMetadataMap();
+    tsFileSequenceReader = new TsFileSequenceReader(tsFile.getAbsolutePath());
+    tsFileReader = new TsFileReader(tsFileSequenceReader);
+
+    deviceMeasurementsMapIterator = 
filterDeviceMeasurementsMapByPattern().entrySet().iterator();
+    measurementDataTypeMap = tsFileSequenceReader.getFullPathDataTypeMap();
   }
 
-  private Map<String, List<TimeseriesMetadata>> 
collectDevice2TimeseriesMetadataMap() {
-    final Map<String, List<TimeseriesMetadata>> result = new HashMap<>();
+  private Map<String, List<String>> filterDeviceMeasurementsMapByPattern() 
throws IOException {
+    final Map<String, List<String>> filteredDeviceMeasurementsMap = new 
HashMap<>();
 
-    try (TsFileSequenceReader reader = new 
TsFileSequenceReader(tsFile.getPath())) {
-      // match pattern
-      for (Map.Entry<String, List<TimeseriesMetadata>> entry :
-          reader.getAllTimeseriesMetadata(true).entrySet()) {
-        final String device = entry.getKey();
-        boolean isVector = false;
+    for (Map.Entry<String, List<String>> entry :
+        tsFileSequenceReader.getDeviceMeasurementsMap().entrySet()) {
+      final String deviceId = entry.getKey();
 
-        // case 1: for example, pattern is root.a.b or pattern is null and 
device is root.a.b.c
-        // in this case, all data can be matched without checking the 
measurements
-        if (pattern == null || pattern.length() <= device.length() && 
device.startsWith(pattern)) {
-          result.put(device, entry.getValue());
+      // case 1: for example, pattern is root.a.b or pattern is null and 
device is root.a.b.c
+      // in this case, all data can be matched without checking the 
measurements
+      if (pattern == null
+          || pattern.length() <= deviceId.length() && 
deviceId.startsWith(pattern)) {
+        if (!entry.getValue().isEmpty()) {
+          filteredDeviceMeasurementsMap.put(deviceId, entry.getValue());
         }
+      }
 
-        // case 2: for example, pattern is root.a.b.c and device is root.a.b
-        // in this case, we need to check the full path
-        else {
-          final List<TimeseriesMetadata> timeseriesMetadataList = new 
ArrayList<>();
-
-          for (TimeseriesMetadata timeseriesMetadata : entry.getValue()) {
-            // TODO: test me!!!
-            if (timeseriesMetadata.getTSDataType() == TSDataType.VECTOR) {
-              vectorTimeseriesMetadata = timeseriesMetadata;
-              isVector = false;
-              continue;
-            }
-
-            final String measurement = timeseriesMetadata.getMeasurementId();
-            // low cost check comes first
-            if (pattern.length() == measurement.length() + device.length() + 1
-                // high cost check comes later
-                && pattern.startsWith(device)
-                && pattern.endsWith(TsFileConstant.PATH_SEPARATOR + 
measurement)) {
-              if (!isVector) {
-                isVector = true;
-                timeseriesMetadataList.add(vectorTimeseriesMetadata);
-              }
-              timeseriesMetadataList.add(timeseriesMetadata);
-            }
+      // case 2: for example, pattern is root.a.b.c and device is root.a.b
+      // in this case, we need to check the full path
+      else if (pattern.length() > deviceId.length() && 
pattern.startsWith(deviceId)) {
+        final List<String> filteredMeasurements = new ArrayList<>();
+
+        for (final String measurement : entry.getValue()) {
+          // low cost check comes first
+          if (pattern.length() == deviceId.length() + measurement.length() + 1
+              // high cost check comes later
+              && pattern.endsWith(TsFileConstant.PATH_SEPARATOR + 
measurement)) {
+            filteredMeasurements.add(measurement);
           }
+        }
 
-          if (!timeseriesMetadataList.isEmpty()) {
-            result.put(device, timeseriesMetadataList);
-          }
+        if (!filteredMeasurements.isEmpty()) {
+          filteredDeviceMeasurementsMap.put(deviceId, filteredMeasurements);
         }
       }
-    } catch (IOException e) {
-      LOGGER.error("Cannot read TsFile {}.", tsFile.getPath(), e);
     }
 
-    return result;
+    return filteredDeviceMeasurementsMap;
   }
 
+  /** @return TabletInsertionEvent in a streaming way */
   public Iterable<TabletInsertionEvent> toTabletInsertionEvents() {
     return () ->
         new Iterator<TabletInsertionEvent>() {
 
-          private final Iterator<Tablet> tabletIterator = 
constructTabletIterable().iterator();
+          private TsFileInsertionDataTabletIterator tabletIterator = null;
 
           @Override
           public boolean hasNext() {
-            return tabletIterator.hasNext();
+            return (tabletIterator != null && tabletIterator.hasNext())
+                || deviceMeasurementsMapIterator.hasNext();
           }
 
           @Override
           public TabletInsertionEvent next() {
-            return new PipeTabletTabletInsertionEvent(tabletIterator.next());
+            if (!hasNext()) {
+              throw new NoSuchElementException();
+            }
+
+            while (tabletIterator == null || !tabletIterator.hasNext()) {
+              if (!deviceMeasurementsMapIterator.hasNext()) {
+                throw new NoSuchElementException();
+              }
+
+              final Map.Entry<String, List<String>> entry = 
deviceMeasurementsMapIterator.next();
+
+              try {
+                tabletIterator =
+                    new TsFileInsertionDataTabletIterator(
+                        tsFileReader, measurementDataTypeMap, entry.getKey(), 
entry.getValue());
+              } catch (IOException e) {
+                throw new PipeException("failed to create 
TsFileInsertionDataTabletIterator", e);
+              }
+            }
+
+            final TabletInsertionEvent next =
+                new PipeRawTabletInsertionEvent(tabletIterator.next());
+
+            if (!hasNext()) {
+              try {
+                close();
+              } catch (Exception e) {
+                LOGGER.warn("Failed to close TsFileInsertionDataContainer", e);
+              }
+            }
+
+            return next;
           }
         };
   }
 
-  private Iterable<Tablet> constructTabletIterable() {
-    return () ->
-        new TsFileInsertionDataTabletIterator(tsFile.getPath(), 
device2TimeseriesMetadataMap);
+  @Override
+  public void close() throws Exception {
+    if (tsFileReader != null) {
+      tsFileReader.close();
+    }
+    if (tsFileSequenceReader != null) {
+      tsFileSequenceReader.close();
+    }
   }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/datastructure/TsFileInsertionDataTabletIterator.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/datastructure/TsFileInsertionDataTabletIterator.java
index 7100e68b753..15959adb7ee 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/datastructure/TsFileInsertionDataTabletIterator.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/datastructure/TsFileInsertionDataTabletIterator.java
@@ -20,101 +20,72 @@
 package org.apache.iotdb.db.pipe.core.event.view.datastructure;
 
 import org.apache.iotdb.pipe.api.exception.PipeException;
-import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
 import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
-import org.apache.iotdb.tsfile.encoding.decoder.Decoder;
-import org.apache.iotdb.tsfile.file.MetaMarker;
-import org.apache.iotdb.tsfile.file.header.ChunkHeader;
-import org.apache.iotdb.tsfile.file.header.PageHeader;
-import org.apache.iotdb.tsfile.file.metadata.IChunkMetadata;
-import org.apache.iotdb.tsfile.file.metadata.TimeseriesMetadata;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
-import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
-import org.apache.iotdb.tsfile.read.common.BatchData;
-import org.apache.iotdb.tsfile.read.reader.page.PageReader;
-import org.apache.iotdb.tsfile.read.reader.page.TimePageReader;
-import org.apache.iotdb.tsfile.read.reader.page.ValuePageReader;
-import org.apache.iotdb.tsfile.utils.BitMap;
-import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
+import org.apache.iotdb.tsfile.read.TsFileReader;
+import org.apache.iotdb.tsfile.read.common.Field;
+import org.apache.iotdb.tsfile.read.common.Path;
+import org.apache.iotdb.tsfile.read.common.RowRecord;
+import org.apache.iotdb.tsfile.read.expression.QueryExpression;
+import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
 import org.apache.iotdb.tsfile.write.record.Tablet;
 import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import java.io.IOException;
-import java.io.UncheckedIOException;
-import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.NoSuchElementException;
+import java.util.stream.Collectors;
 
 public class TsFileInsertionDataTabletIterator implements Iterator<Tablet> {
 
-  private static Logger LOGGER = 
LoggerFactory.getLogger(TsFileInsertionDataTabletIterator.class);
-  private final TsFileSequenceReader reader;
-  private final String filePath;
-  private final Iterator<Map.Entry<String, List<TimeseriesMetadata>>> 
entriesIterator;
-  private Map.Entry<String, List<TimeseriesMetadata>> currentEntry;
-  private Iterator<TimeseriesMetadata> timeseriesMetadataIterator;
-  private TimeseriesMetadata currentTimeseriesMetadata;
-  private List<MeasurementSchema> measurementSchemas;
+  private final TsFileReader tsFileReader;
+  private final Map<String, TSDataType> measurementDataTypeMap;
 
-  private boolean isAligned;
-  private final List<long[]> timeBatches;
-  private long[] timestampsForAligned;
+  private final String deviceId;
+  private final List<String> measurements;
 
-  public TsFileInsertionDataTabletIterator(
-      String filePath, Map<String, List<TimeseriesMetadata>> 
device2TimeseriesMetadataMap) {
-    this.filePath = filePath;
-    this.entriesIterator = device2TimeseriesMetadataMap.entrySet().iterator();
-    this.timeBatches = new ArrayList<>();
-    this.currentEntry = null;
-    this.timeseriesMetadataIterator = null;
-    this.currentTimeseriesMetadata = null;
-    this.measurementSchemas = null;
-    this.isAligned = false;
-    this.timestampsForAligned = null;
-    try {
-      this.reader = new TsFileSequenceReader(filePath);
-    } catch (IOException e) {
-      throw new PipeException("Cannot create TsFileSequenceReader for file " + 
filePath, e);
-    }
+  private final QueryDataSet queryDataSet;
 
-    // Initialize timeseriesMetadataIterator if there is a next entry
-    if (entriesIterator.hasNext()) {
-      currentEntry = entriesIterator.next();
-      timeseriesMetadataIterator = currentEntry.getValue().iterator();
-    } else {
-      timeseriesMetadataIterator =
-          new Iterator<TimeseriesMetadata>() {
-            @Override
-            public boolean hasNext() {
-              return false;
-            }
+  public TsFileInsertionDataTabletIterator(
+      TsFileReader tsFileReader,
+      Map<String, TSDataType> measurementDataTypeMap,
+      String deviceId,
+      List<String> measurements)
+      throws IOException {
+    this.tsFileReader = tsFileReader;
+    this.measurementDataTypeMap = measurementDataTypeMap;
+
+    this.deviceId = deviceId;
+    this.measurements =
+        measurements.stream()
+            .filter(
+                measurement ->
+                    // time column in aligned time-series should not be a 
query column
+                    measurement != null && !measurement.isEmpty())
+            .sorted()
+            .collect(Collectors.toList());
+
+    this.queryDataSet = buildQueryDataSet();
+  }
 
-            @Override
-            public TimeseriesMetadata next() {
-              return null;
-            }
-          };
+  private QueryDataSet buildQueryDataSet() throws IOException {
+    final List<Path> paths = new ArrayList<>();
+    for (String measurement : measurements) {
+      paths.add(new Path(deviceId, measurement, false));
     }
+    return tsFileReader.query(QueryExpression.create(paths, null));
   }
 
   @Override
   public boolean hasNext() {
-    boolean hasNext = timeseriesMetadataIterator.hasNext() || 
entriesIterator.hasNext();
-    if (!hasNext) {
-      try {
-        reader.close();
-      } catch (IOException e) {
-        LOGGER.warn("Cannot close TsFileSequenceReader for file {}", filePath, 
e);
-      }
+    try {
+      return queryDataSet.hasNext();
+    } catch (IOException e) {
+      throw new PipeException("Failed to check next", e);
     }
-    return hasNext;
   }
 
   @Override
@@ -123,165 +94,48 @@ public class TsFileInsertionDataTabletIterator implements 
Iterator<Tablet> {
       throw new NoSuchElementException();
     }
 
-    if (!timeseriesMetadataIterator.hasNext()) {
-      currentEntry = entriesIterator.next();
-      timeseriesMetadataIterator = currentEntry.getValue().iterator();
-    }
-    currentTimeseriesMetadata = timeseriesMetadataIterator.next();
-    measurementSchemas = new ArrayList<>();
-
-    try (TsFileSequenceReader reader = new TsFileSequenceReader(filePath)) {
-      if (currentTimeseriesMetadata.getTSDataType() == TSDataType.VECTOR) {
-        processTimeseriesMetadata(currentTimeseriesMetadata, reader);
-        currentTimeseriesMetadata = timeseriesMetadataIterator.next();
-      }
-      return processTimeseriesMetadata(currentTimeseriesMetadata, reader);
+    try {
+      return buildNextTablet();
     } catch (IOException e) {
-      throw new UncheckedIOException(e);
-    }
-  }
-
-  private Tablet createTablet(long[] timestamps, Object[] values, BitMap[] 
bitMaps) {
-    long[] tmp;
-
-    if (isAligned) {
-      if (timestampsForAligned == null) {
-        timestampsForAligned = timestamps;
-        return null;
-      }
-      tmp = timestampsForAligned;
-    } else {
-      tmp = timestamps;
+      throw new PipeException("Failed to build tablet", e);
     }
-
-    // create tablet
-    int rowSize = tmp.length;
-    Tablet tablet = new Tablet(currentEntry.getKey(), measurementSchemas, 
rowSize);
-    tablet.timestamps = tmp;
-    tablet.values = values;
-    tablet.rowSize = rowSize;
-    tablet.bitMaps = bitMaps;
-
-    return tablet;
   }
 
-  private Tablet processTimeseriesMetadata(
-      TimeseriesMetadata timeseriesMetadata, TsFileSequenceReader reader) {
-    int pageIndex = 0;
-    if (timeseriesMetadata.getTSDataType() == TSDataType.VECTOR) {
-      isAligned = true;
-      timeBatches.clear();
-    } else {
-      MeasurementSchema measurementSchema =
-          new MeasurementSchema(
-              timeseriesMetadata.getMeasurementId(), 
timeseriesMetadata.getTSDataType());
-      measurementSchemas.add(measurementSchema);
+  private Tablet buildNextTablet() throws IOException {
+    final List<MeasurementSchema> schemas = new ArrayList<>();
+    for (final String measurement : measurements) {
+      final TSDataType dataType =
+          measurementDataTypeMap.get(deviceId + TsFileConstant.PATH_SEPARATOR 
+ measurement);
+      schemas.add(new MeasurementSchema(measurement, dataType));
     }
+    final Tablet tablet = new Tablet(deviceId, schemas);
+    tablet.initBitMaps();
 
-    List<Byte> bitMapBytes = new ArrayList<>();
-    List<Object> measurementValues = new ArrayList<>();
-    List<Long> measurementTimestamps = new ArrayList<>();
-
-    for (IChunkMetadata chunkMetadata : 
timeseriesMetadata.getChunkMetadataList()) {
-      long offset = chunkMetadata.getOffsetOfChunkHeader();
-      try {
-        reader.position(offset);
-        ChunkHeader header = reader.readChunkHeader(reader.readMarker());
-        int dataSize = header.getDataSize();
-
-        Decoder defaultTimeDecoder =
-            Decoder.getDecoderByType(
-                
TSEncoding.valueOf(TSFileDescriptor.getInstance().getConfig().getTimeEncoder()),
-                TSDataType.INT64);
-        Decoder valueDecoder =
-            Decoder.getDecoderByType(header.getEncodingType(), 
header.getDataType());
-        pageIndex = 0;
-        if (header.getDataType() == TSDataType.VECTOR) {
-          timeBatches.clear();
-        }
-
-        while (dataSize > 0) {
-          PageHeader pageHeader =
-              reader.readPageHeader(
-                  header.getDataType(), (header.getChunkType() & 0x3F) == 
MetaMarker.CHUNK_HEADER);
-          ByteBuffer pageData = reader.readPage(pageHeader, 
header.getCompressionType());
+    while (queryDataSet.hasNext()) {
+      final RowRecord rowRecord = queryDataSet.next();
 
-          // Time column chunk
-          if ((header.getChunkType() & TsFileConstant.TIME_COLUMN_MASK)
-              == TsFileConstant.TIME_COLUMN_MASK) {
-            TimePageReader timePageReader =
-                new TimePageReader(pageHeader, pageData, defaultTimeDecoder);
-            long[] timeBatch = timePageReader.getNextTimeBatch();
-            timeBatches.add(timeBatch);
+      final int rowIndex = tablet.rowSize;
 
-            for (long time : timeBatch) {
-              measurementTimestamps.add(time);
-            }
-          }
-          // Value column chunk
-          else if ((header.getChunkType() & TsFileConstant.VALUE_COLUMN_MASK)
-              == TsFileConstant.VALUE_COLUMN_MASK) {
-            ValuePageReader valuePageReader =
-                new ValuePageReader(pageHeader, pageData, 
header.getDataType(), valueDecoder);
+      tablet.addTimestamp(rowIndex, rowRecord.getTimestamp());
 
-            for (byte value : valuePageReader.getBitmap()) {
-              bitMapBytes.add(value);
-            }
-
-            for (TsPrimitiveType value :
-                valuePageReader.nextValueBatch(timeBatches.get(pageIndex))) {
-              measurementValues.add(value.getValue());
-            }
-          }
-
-          // NonAligned Chunk
-          else {
-            PageReader pageReader =
-                new PageReader(
-                    pageData, header.getDataType(), valueDecoder, 
defaultTimeDecoder, null);
-            BatchData batchData = pageReader.getAllSatisfiedPageData();
-            List<Integer> isNullList = new ArrayList<>();
-            int index = 0;
-            while (batchData.hasCurrent()) {
-              measurementTimestamps.add(batchData.currentTime());
-              Object value = batchData.currentValue();
-
-              if (value == null) {
-                isNullList.add(index);
-              }
-              measurementValues.add(value);
-              index++;
-              batchData.next();
-            }
-
-            BitMap bitmap = new BitMap(measurementTimestamps.size());
-            for (int isNull : isNullList) {
-              bitmap.mark(isNull);
-            }
-            byte[] bytes = bitmap.getByteArray();
-            for (byte value : bytes) {
-              bitMapBytes.add(value);
-            }
-          }
-          pageIndex++;
-          dataSize -= pageHeader.getSerializedPageSize();
+      final List<Field> fields = rowRecord.getFields();
+      final int fieldSize = fields.size();
+      for (int i = 0; i < fieldSize; i++) {
+        final Field field = fields.get(i);
+        if (field == null || field.getDataType() == null) {
+          tablet.bitMaps[i].mark(rowIndex);
+        } else {
+          tablet.addValue(measurements.get(i), rowIndex, 
field.getObjectValue(field.getDataType()));
         }
-      } catch (IOException e) {
-        throw new UncheckedIOException(e);
       }
-    }
 
-    long[] timestamps = new long[measurementTimestamps.size()];
-    for (int i = 0; i < measurementTimestamps.size(); i++) {
-      timestamps[i] = measurementTimestamps.get(i);
-    }
+      tablet.rowSize++;
 
-    byte[] byteArray = new byte[bitMapBytes.size()];
-    for (int i = 0; i < bitMapBytes.size(); i++) {
-      byteArray[i] = bitMapBytes.get(i);
+      if (tablet.rowSize == tablet.getMaxRowNumber()) {
+        break;
+      }
     }
-    BitMap[] bitMaps = new BitMap[] {new BitMap(byteArray.length, byteArray)};
 
-    return createTablet(timestamps, measurementValues.toArray(), bitMaps);
+    return tablet;
   }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/core/processor/PipeDoNothingProcessor.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/core/processor/PipeDoNothingProcessor.java
index 62979b7d52c..c9774b4618a 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/core/processor/PipeDoNothingProcessor.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/core/processor/PipeDoNothingProcessor.java
@@ -56,15 +56,23 @@ public class PipeDoNothingProcessor implements 
PipeProcessor {
           .equals(PipeCollectorConstant.COLLECTOR_PATTERN_DEFAULT_VALUE)) {
         eventCollector.collect(tabletInsertionEvent);
       } else {
-        eventCollector.collect(
-            tabletInsertionEvent.processRowByRow(
+        tabletInsertionEvent
+            .processRowByRow(
                 (row, rowCollector) -> {
                   try {
                     rowCollector.collectRow(row);
                   } catch (IOException e) {
                     throw new PipeException("Failed to collect row", e);
                   }
-                }));
+                })
+            .forEach(
+                event -> {
+                  try {
+                    eventCollector.collect(event);
+                  } catch (IOException e) {
+                    throw new PipeException("Failed to collect event", e);
+                  }
+                });
       }
     } else {
       eventCollector.collect(tabletInsertionEvent);
diff --git 
a/server/src/test/java/org/apache/iotdb/db/pipe/core/event/PipeInsertNodeTabletInsertionEventTest.java
 
b/server/src/test/java/org/apache/iotdb/db/pipe/core/event/PipeTabletInsertionEventTest.java
similarity index 78%
rename from 
server/src/test/java/org/apache/iotdb/db/pipe/core/event/PipeInsertNodeTabletInsertionEventTest.java
rename to 
server/src/test/java/org/apache/iotdb/db/pipe/core/event/PipeTabletInsertionEventTest.java
index 5a211fc2cff..6a68b5ad7cd 100644
--- 
a/server/src/test/java/org/apache/iotdb/db/pipe/core/event/PipeInsertNodeTabletInsertionEventTest.java
+++ 
b/server/src/test/java/org/apache/iotdb/db/pipe/core/event/PipeTabletInsertionEventTest.java
@@ -24,7 +24,8 @@ import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertRowNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertTabletNode;
-import 
org.apache.iotdb.db.pipe.core.event.impl.PipeInsertNodeTabletInsertionEvent;
+import org.apache.iotdb.db.pipe.core.event.impl.PipeRawTabletInsertionEvent;
+import 
org.apache.iotdb.db.pipe.core.event.view.datastructure.TabletInsertionDataContainer;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.utils.Binary;
 import org.apache.iotdb.tsfile.utils.BitMap;
@@ -37,7 +38,7 @@ import org.junit.Test;
 
 import java.util.Arrays;
 
-public class PipeInsertNodeTabletInsertionEventTest {
+public class PipeTabletInsertionEventTest {
 
   InsertRowNode insertRowNode;
   InsertTabletNode insertTabletNode;
@@ -56,7 +57,6 @@ public class PipeInsertNodeTabletInsertionEventTest {
       };
 
   final MeasurementSchema[] schemas = new MeasurementSchema[6];
-  final Object[] values = new Object[6];
 
   final String pattern = "root.sg.d1";
 
@@ -78,6 +78,14 @@ public class PipeInsertNodeTabletInsertionEventTest {
   }
 
   private void createInsertRowNode() throws IllegalPathException {
+    final Object[] values = new Object[6];
+
+    values[0] = 100;
+    values[1] = 10000L;
+    values[2] = 2F;
+    values[3] = 1.0;
+    values[4] = false;
+    values[5] = Binary.valueOf("text");
 
     insertRowNode =
         new InsertRowNode(
@@ -93,6 +101,24 @@ public class PipeInsertNodeTabletInsertionEventTest {
   }
 
   private void createInsertTabletNode() throws IllegalPathException {
+    final Object[] values = new Object[6];
+
+    values[0] = new int[5];
+    values[1] = new long[5];
+    values[2] = new float[5];
+    values[3] = new double[5];
+    values[4] = new boolean[5];
+    values[5] = new Binary[5];
+
+    for (int r = 0; r < 5; r++) {
+      ((int[]) values[0])[r] = 100;
+      ((long[]) values[1])[r] = 10000;
+      ((float[]) values[2])[r] = 2;
+      ((double[]) values[3])[r] = 1.0;
+      ((boolean[]) values[4])[r] = false;
+      ((Binary[]) values[5])[r] = Binary.valueOf("text");
+    }
+
     this.insertTabletNode =
         new InsertTabletNode(
             new PlanNodeId("plannode 1"),
@@ -108,6 +134,7 @@ public class PipeInsertNodeTabletInsertionEventTest {
   }
 
   private void createTablet() {
+    final Object[] values = new Object[6];
 
     // create tablet for insertRowNode
     BitMap[] bitMapsForInsertRowNode = new BitMap[6];
@@ -167,12 +194,18 @@ public class PipeInsertNodeTabletInsertionEventTest {
 
   @Test
   public void convertToTabletForTest() {
-    PipeInsertNodeTabletInsertionEvent event1 = new 
PipeInsertNodeTabletInsertionEvent(null, null);
-    Tablet tablet1 = event1.convertToTabletForTest(insertRowNode, pattern);
+    Tablet tablet1 = new TabletInsertionDataContainer(insertRowNode, 
pattern).convertToTablet();
     Assert.assertEquals(tablet1, tabletForInsertRowNode);
 
-    PipeInsertNodeTabletInsertionEvent event2 = new 
PipeInsertNodeTabletInsertionEvent(null, null);
-    Tablet tablet2 = event2.convertToTabletForTest(insertTabletNode, pattern);
+    Tablet tablet2 = new TabletInsertionDataContainer(insertTabletNode, 
pattern).convertToTablet();
     Assert.assertEquals(tablet2, tabletForInsertTabletNode);
+
+    PipeRawTabletInsertionEvent event3 = new 
PipeRawTabletInsertionEvent(tablet1, pattern);
+    Tablet tablet3 = event3.convertToTablet();
+    Assert.assertEquals(tablet1, tablet3);
+
+    PipeRawTabletInsertionEvent event4 = new 
PipeRawTabletInsertionEvent(tablet2, pattern);
+    Tablet tablet4 = event4.convertToTablet();
+    Assert.assertEquals(tablet2, tablet4);
   }
 }
diff --git 
a/server/src/test/java/org/apache/iotdb/db/pipe/core/event/TsFileInsertionDataContainerTest.java
 
b/server/src/test/java/org/apache/iotdb/db/pipe/core/event/TsFileInsertionDataContainerTest.java
new file mode 100644
index 00000000000..d74f790fd15
--- /dev/null
+++ 
b/server/src/test/java/org/apache/iotdb/db/pipe/core/event/TsFileInsertionDataContainerTest.java
@@ -0,0 +1,608 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.core.event;
+
+import org.apache.iotdb.db.pipe.core.event.impl.PipeRawTabletInsertionEvent;
+import 
org.apache.iotdb.db.pipe.core.event.view.datastructure.TsFileInsertionDataContainer;
+import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
+import org.apache.iotdb.tsfile.read.common.Path;
+import org.apache.iotdb.tsfile.utils.TsFileGeneratorUtils;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.junit.Assert.fail;
+
+public class TsFileInsertionDataContainerTest {
+
+  private static final Logger LOGGER =
+      LoggerFactory.getLogger(TsFileInsertionDataContainerTest.class);
+
+  private File alignedTsFile;
+  private File nonalignedTsFile;
+
+  @After
+  public void tearDown() throws Exception {
+    if (alignedTsFile != null) {
+      alignedTsFile.delete();
+    }
+    if (nonalignedTsFile != null) {
+      nonalignedTsFile.delete();
+    }
+  }
+
+  @Test
+  public void testToTabletInsertionEvents() throws Exception {
+    Set<Integer> deviceNumbers = new HashSet<>();
+    deviceNumbers.add(1);
+    deviceNumbers.add(2);
+
+    Set<Integer> measurementNumbers = new HashSet<>();
+    measurementNumbers.add(1);
+    measurementNumbers.add(2);
+
+    for (int deviceNumber : deviceNumbers) {
+      for (int measurementNumber : measurementNumbers) {
+        testToTabletInsertionEvents(deviceNumber, measurementNumber, 0);
+        testToTabletInsertionEvents(deviceNumber, measurementNumber, 1);
+        testToTabletInsertionEvents(deviceNumber, measurementNumber, 2);
+
+        testToTabletInsertionEvents(deviceNumber, measurementNumber, 999);
+        testToTabletInsertionEvents(deviceNumber, measurementNumber, 1000);
+        testToTabletInsertionEvents(deviceNumber, measurementNumber, 1001);
+
+        testToTabletInsertionEvents(deviceNumber, measurementNumber, 999 * 2 + 
1);
+        testToTabletInsertionEvents(deviceNumber, measurementNumber, 1000);
+        testToTabletInsertionEvents(deviceNumber, measurementNumber, 1001 * 2 
- 1);
+
+        testToTabletInsertionEvents(deviceNumber, measurementNumber, 1023);
+        testToTabletInsertionEvents(deviceNumber, measurementNumber, 1024);
+        testToTabletInsertionEvents(deviceNumber, measurementNumber, 1025);
+
+        testToTabletInsertionEvents(deviceNumber, measurementNumber, 1023 * 2 
+ 1);
+        testToTabletInsertionEvents(deviceNumber, measurementNumber, 1024 * 2);
+        testToTabletInsertionEvents(deviceNumber, measurementNumber, 1025 * 2 
- 1);
+
+        testToTabletInsertionEvents(deviceNumber, measurementNumber, 10001);
+      }
+    }
+  }
+
+  private void testToTabletInsertionEvents(
+      int deviceNumber, int measurementNumber, int rowNumberInOneDevice) 
throws Exception {
+    LOGGER.info(
+        "testToTabletInsertionEvents: deviceNumber = {}, measurementNumber = 
{}, rowNumberInOneDevice = {}",
+        deviceNumber,
+        measurementNumber,
+        rowNumberInOneDevice);
+
+    alignedTsFile =
+        TsFileGeneratorUtils.generateAlignedTsFile(
+            "aligned.tsfile",
+            deviceNumber,
+            measurementNumber,
+            rowNumberInOneDevice,
+            300,
+            10000,
+            700,
+            50);
+    nonalignedTsFile =
+        TsFileGeneratorUtils.generateNonAlignedTsFile(
+            "nonaligned.tsfile",
+            deviceNumber,
+            measurementNumber,
+            rowNumberInOneDevice,
+            300,
+            10000,
+            700,
+            50);
+
+    try (final TsFileInsertionDataContainer alignedContainer =
+            new TsFileInsertionDataContainer(alignedTsFile, "root");
+        final TsFileInsertionDataContainer nonalignedContainer =
+            new TsFileInsertionDataContainer(nonalignedTsFile, "root"); ) {
+      AtomicInteger count1 = new AtomicInteger(0);
+      AtomicInteger count2 = new AtomicInteger(0);
+      AtomicInteger count3 = new AtomicInteger(0);
+
+      alignedContainer
+          .toTabletInsertionEvents()
+          .forEach(
+              event ->
+                  event
+                      .processRowByRow(
+                          (row, collector) -> {
+                            try {
+                              collector.collectRow(row);
+                              Assert.assertEquals(measurementNumber, 
row.size());
+                              count1.incrementAndGet();
+                            } catch (IOException e) {
+                              throw new RuntimeException(e);
+                            }
+                          })
+                      .forEach(
+                          tabletInsertionEvent1 ->
+                              tabletInsertionEvent1
+                                  .processRowByRow(
+                                      (row, collector) -> {
+                                        try {
+                                          collector.collectRow(row);
+                                          
Assert.assertEquals(measurementNumber, row.size());
+                                          count2.incrementAndGet();
+                                        } catch (IOException e) {
+                                          throw new RuntimeException(e);
+                                        }
+                                      })
+                                  .forEach(
+                                      tabletInsertionEvent2 ->
+                                          tabletInsertionEvent2.processTablet(
+                                              (tablet, rowCollector) -> {
+                                                new 
PipeRawTabletInsertionEvent(tablet)
+                                                    .processRowByRow(
+                                                        (row, collector) -> {
+                                                          try {
+                                                            
rowCollector.collectRow(row);
+                                                            
Assert.assertEquals(
+                                                                
measurementNumber, row.size());
+                                                            
count3.incrementAndGet();
+                                                          } catch (IOException 
e) {
+                                                            throw new 
RuntimeException(e);
+                                                          }
+                                                        });
+                                              }))));
+
+      Assert.assertEquals(count1.getAndSet(0), deviceNumber * 
rowNumberInOneDevice);
+      Assert.assertEquals(count2.getAndSet(0), deviceNumber * 
rowNumberInOneDevice);
+      Assert.assertEquals(count3.getAndSet(0), deviceNumber * 
rowNumberInOneDevice);
+
+      nonalignedContainer
+          .toTabletInsertionEvents()
+          .forEach(
+              event ->
+                  event
+                      .processTablet(
+                          (tablet, rowCollector) -> {
+                            new PipeRawTabletInsertionEvent(tablet)
+                                .processRowByRow(
+                                    (row, collector) -> {
+                                      try {
+                                        rowCollector.collectRow(row);
+                                        Assert.assertEquals(measurementNumber, 
row.size());
+                                        count1.incrementAndGet();
+                                      } catch (IOException e) {
+                                        throw new RuntimeException(e);
+                                      }
+                                    });
+                          })
+                      .forEach(
+                          tabletInsertionEvent1 ->
+                              tabletInsertionEvent1
+                                  .processRowByRow(
+                                      (row, collector) -> {
+                                        try {
+                                          collector.collectRow(row);
+                                          
Assert.assertEquals(measurementNumber, row.size());
+                                          count2.incrementAndGet();
+                                        } catch (IOException e) {
+                                          throw new RuntimeException(e);
+                                        }
+                                      })
+                                  .forEach(
+                                      tabletInsertionEvent2 ->
+                                          
tabletInsertionEvent2.processRowByRow(
+                                              (row, collector) -> {
+                                                try {
+                                                  collector.collectRow(row);
+                                                  Assert.assertEquals(
+                                                      measurementNumber, 
row.size());
+                                                  count3.incrementAndGet();
+                                                } catch (IOException e) {
+                                                  throw new 
RuntimeException(e);
+                                                }
+                                              }))));
+
+      Assert.assertEquals(count1.get(), deviceNumber * rowNumberInOneDevice);
+      Assert.assertEquals(count2.get(), deviceNumber * rowNumberInOneDevice);
+      Assert.assertEquals(count3.get(), deviceNumber * rowNumberInOneDevice);
+    } catch (Exception e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    }
+
+    AtomicReference<String> oneDeviceInAlignedTsFile = new AtomicReference<>();
+    AtomicReference<String> oneMeasurementInAlignedTsFile = new 
AtomicReference<>();
+
+    AtomicReference<String> oneDeviceInUnalignedTsFile = new 
AtomicReference<>();
+    AtomicReference<String> oneMeasurementInUnalignedTsFile = new 
AtomicReference<>();
+
+    try (TsFileSequenceReader alignedReader =
+            new TsFileSequenceReader(alignedTsFile.getAbsolutePath());
+        TsFileSequenceReader nonalignedReader =
+            new TsFileSequenceReader(nonalignedTsFile.getAbsolutePath())) {
+
+      alignedReader
+          .getDeviceMeasurementsMap()
+          .forEach(
+              (k, v) ->
+                  v.stream()
+                      .filter(p -> p != null && !p.isEmpty())
+                      .forEach(
+                          p -> {
+                            oneDeviceInAlignedTsFile.set(k);
+                            oneMeasurementInAlignedTsFile.set(new Path(k, p, 
false).toString());
+                          }));
+      nonalignedReader
+          .getDeviceMeasurementsMap()
+          .forEach(
+              (k, v) ->
+                  v.stream()
+                      .filter(p -> p != null && !p.isEmpty())
+                      .forEach(
+                          p -> {
+                            oneDeviceInUnalignedTsFile.set(k);
+                            oneMeasurementInUnalignedTsFile.set(new Path(k, p, 
false).toString());
+                          }));
+    }
+
+    try (final TsFileInsertionDataContainer alignedContainer =
+            new TsFileInsertionDataContainer(alignedTsFile, 
oneDeviceInAlignedTsFile.get());
+        final TsFileInsertionDataContainer nonalignedContainer =
+            new TsFileInsertionDataContainer(
+                nonalignedTsFile, oneDeviceInUnalignedTsFile.get()); ) {
+      AtomicInteger count1 = new AtomicInteger(0);
+      AtomicInteger count2 = new AtomicInteger(0);
+      AtomicInteger count3 = new AtomicInteger(0);
+
+      alignedContainer
+          .toTabletInsertionEvents()
+          .forEach(
+              event ->
+                  event
+                      .processRowByRow(
+                          (row, collector) -> {
+                            try {
+                              collector.collectRow(row);
+                              Assert.assertEquals(measurementNumber, 
row.size());
+                              count1.incrementAndGet();
+                            } catch (IOException e) {
+                              throw new RuntimeException(e);
+                            }
+                          })
+                      .forEach(
+                          tabletInsertionEvent1 ->
+                              tabletInsertionEvent1
+                                  .processRowByRow(
+                                      (row, collector) -> {
+                                        try {
+                                          collector.collectRow(row);
+                                          
Assert.assertEquals(measurementNumber, row.size());
+                                          count2.incrementAndGet();
+                                        } catch (IOException e) {
+                                          throw new RuntimeException(e);
+                                        }
+                                      })
+                                  .forEach(
+                                      tabletInsertionEvent2 ->
+                                          tabletInsertionEvent2.processTablet(
+                                              (tablet, rowCollector) -> {
+                                                new 
PipeRawTabletInsertionEvent(tablet)
+                                                    .processRowByRow(
+                                                        (row, collector) -> {
+                                                          try {
+                                                            
rowCollector.collectRow(row);
+                                                            
Assert.assertEquals(
+                                                                
measurementNumber, row.size());
+                                                            
count3.incrementAndGet();
+                                                          } catch (IOException 
e) {
+                                                            throw new 
RuntimeException(e);
+                                                          }
+                                                        });
+                                              }))));
+
+      Assert.assertEquals(count1.getAndSet(0), rowNumberInOneDevice);
+      Assert.assertEquals(count2.getAndSet(0), rowNumberInOneDevice);
+      Assert.assertEquals(count3.getAndSet(0), rowNumberInOneDevice);
+
+      nonalignedContainer
+          .toTabletInsertionEvents()
+          .forEach(
+              event ->
+                  event
+                      .processTablet(
+                          (tablet, rowCollector) -> {
+                            new PipeRawTabletInsertionEvent(tablet)
+                                .processRowByRow(
+                                    (row, collector) -> {
+                                      try {
+                                        rowCollector.collectRow(row);
+                                        Assert.assertEquals(measurementNumber, 
row.size());
+                                        count1.incrementAndGet();
+                                      } catch (IOException e) {
+                                        throw new RuntimeException(e);
+                                      }
+                                    });
+                          })
+                      .forEach(
+                          tabletInsertionEvent1 ->
+                              tabletInsertionEvent1
+                                  .processRowByRow(
+                                      (row, collector) -> {
+                                        try {
+                                          collector.collectRow(row);
+                                          
Assert.assertEquals(measurementNumber, row.size());
+                                          count2.incrementAndGet();
+                                        } catch (IOException e) {
+                                          throw new RuntimeException(e);
+                                        }
+                                      })
+                                  .forEach(
+                                      tabletInsertionEvent2 ->
+                                          
tabletInsertionEvent2.processRowByRow(
+                                              (row, collector) -> {
+                                                try {
+                                                  collector.collectRow(row);
+                                                  Assert.assertEquals(
+                                                      measurementNumber, 
row.size());
+                                                  count3.incrementAndGet();
+                                                } catch (IOException e) {
+                                                  throw new 
RuntimeException(e);
+                                                }
+                                              }))));
+
+      Assert.assertEquals(count1.get(), rowNumberInOneDevice);
+      Assert.assertEquals(count2.get(), rowNumberInOneDevice);
+      Assert.assertEquals(count3.get(), rowNumberInOneDevice);
+    } catch (Exception e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    }
+
+    try (final TsFileInsertionDataContainer alignedContainer =
+            new TsFileInsertionDataContainer(alignedTsFile, 
oneMeasurementInAlignedTsFile.get());
+        final TsFileInsertionDataContainer nonalignedContainer =
+            new TsFileInsertionDataContainer(
+                nonalignedTsFile, oneMeasurementInUnalignedTsFile.get()); ) {
+      AtomicInteger count1 = new AtomicInteger(0);
+      AtomicInteger count2 = new AtomicInteger(0);
+      AtomicInteger count3 = new AtomicInteger(0);
+
+      alignedContainer
+          .toTabletInsertionEvents()
+          .forEach(
+              event ->
+                  event
+                      .processRowByRow(
+                          (row, collector) -> {
+                            try {
+                              collector.collectRow(row);
+                              Assert.assertEquals(1, row.size());
+                              count1.incrementAndGet();
+                            } catch (IOException e) {
+                              throw new RuntimeException(e);
+                            }
+                          })
+                      .forEach(
+                          tabletInsertionEvent1 ->
+                              tabletInsertionEvent1
+                                  .processRowByRow(
+                                      (row, collector) -> {
+                                        try {
+                                          collector.collectRow(row);
+                                          Assert.assertEquals(1, row.size());
+                                          count2.incrementAndGet();
+                                        } catch (IOException e) {
+                                          throw new RuntimeException(e);
+                                        }
+                                      })
+                                  .forEach(
+                                      tabletInsertionEvent2 ->
+                                          tabletInsertionEvent2.processTablet(
+                                              (tablet, rowCollector) -> {
+                                                new 
PipeRawTabletInsertionEvent(tablet)
+                                                    .processRowByRow(
+                                                        (row, collector) -> {
+                                                          try {
+                                                            
rowCollector.collectRow(row);
+                                                            
Assert.assertEquals(1, row.size());
+                                                            
count3.incrementAndGet();
+                                                          } catch (IOException 
e) {
+                                                            throw new 
RuntimeException(e);
+                                                          }
+                                                        });
+                                              }))));
+
+      Assert.assertEquals(count1.getAndSet(0), rowNumberInOneDevice);
+      Assert.assertEquals(count2.getAndSet(0), rowNumberInOneDevice);
+      Assert.assertEquals(count3.getAndSet(0), rowNumberInOneDevice);
+
+      nonalignedContainer
+          .toTabletInsertionEvents()
+          .forEach(
+              event ->
+                  event
+                      .processTablet(
+                          (tablet, rowCollector) -> {
+                            new PipeRawTabletInsertionEvent(tablet)
+                                .processRowByRow(
+                                    (row, collector) -> {
+                                      try {
+                                        rowCollector.collectRow(row);
+                                        Assert.assertEquals(1, row.size());
+                                        count1.incrementAndGet();
+                                      } catch (IOException e) {
+                                        throw new RuntimeException(e);
+                                      }
+                                    });
+                          })
+                      .forEach(
+                          tabletInsertionEvent1 ->
+                              tabletInsertionEvent1
+                                  .processRowByRow(
+                                      (row, collector) -> {
+                                        try {
+                                          collector.collectRow(row);
+                                          Assert.assertEquals(1, row.size());
+                                          count2.incrementAndGet();
+                                        } catch (IOException e) {
+                                          throw new RuntimeException(e);
+                                        }
+                                      })
+                                  .forEach(
+                                      tabletInsertionEvent2 ->
+                                          
tabletInsertionEvent2.processRowByRow(
+                                              (row, collector) -> {
+                                                try {
+                                                  collector.collectRow(row);
+                                                  Assert.assertEquals(1, 
row.size());
+                                                  count3.incrementAndGet();
+                                                } catch (IOException e) {
+                                                  throw new 
RuntimeException(e);
+                                                }
+                                              }))));
+
+      Assert.assertEquals(count1.get(), rowNumberInOneDevice);
+      Assert.assertEquals(count2.get(), rowNumberInOneDevice);
+      Assert.assertEquals(count3.get(), rowNumberInOneDevice);
+    } catch (Exception e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    }
+
+    try (final TsFileInsertionDataContainer alignedContainer =
+            new TsFileInsertionDataContainer(alignedTsFile, 
"not-exist-pattern");
+        final TsFileInsertionDataContainer nonalignedContainer =
+            new TsFileInsertionDataContainer(nonalignedTsFile, 
"not-exist-pattern"); ) {
+      AtomicInteger count1 = new AtomicInteger(0);
+      AtomicInteger count2 = new AtomicInteger(0);
+      AtomicInteger count3 = new AtomicInteger(0);
+
+      alignedContainer
+          .toTabletInsertionEvents()
+          .forEach(
+              event ->
+                  event
+                      .processRowByRow(
+                          (row, collector) -> {
+                            try {
+                              collector.collectRow(row);
+                              Assert.assertEquals(0, row.size());
+                              count1.incrementAndGet();
+                            } catch (IOException e) {
+                              throw new RuntimeException(e);
+                            }
+                          })
+                      .forEach(
+                          tabletInsertionEvent1 ->
+                              tabletInsertionEvent1
+                                  .processRowByRow(
+                                      (row, collector) -> {
+                                        try {
+                                          collector.collectRow(row);
+                                          Assert.assertEquals(0, row.size());
+                                          count2.incrementAndGet();
+                                        } catch (IOException e) {
+                                          throw new RuntimeException(e);
+                                        }
+                                      })
+                                  .forEach(
+                                      tabletInsertionEvent2 ->
+                                          tabletInsertionEvent2.processTablet(
+                                              (tablet, rowCollector) -> {
+                                                new 
PipeRawTabletInsertionEvent(tablet)
+                                                    .processRowByRow(
+                                                        (row, collector) -> {
+                                                          try {
+                                                            
rowCollector.collectRow(row);
+                                                            
Assert.assertEquals(0, row.size());
+                                                            
count3.incrementAndGet();
+                                                          } catch (IOException 
e) {
+                                                            throw new 
RuntimeException(e);
+                                                          }
+                                                        });
+                                              }))));
+
+      Assert.assertEquals(count1.getAndSet(0), 0);
+      Assert.assertEquals(count2.getAndSet(0), 0);
+      Assert.assertEquals(count3.getAndSet(0), 0);
+
+      nonalignedContainer
+          .toTabletInsertionEvents()
+          .forEach(
+              event ->
+                  event
+                      .processTablet(
+                          (tablet, rowCollector) -> {
+                            new PipeRawTabletInsertionEvent(tablet)
+                                .processRowByRow(
+                                    (row, collector) -> {
+                                      try {
+                                        rowCollector.collectRow(row);
+                                        Assert.assertEquals(0, row.size());
+                                        count1.incrementAndGet();
+                                      } catch (IOException e) {
+                                        throw new RuntimeException(e);
+                                      }
+                                    });
+                          })
+                      .forEach(
+                          tabletInsertionEvent1 ->
+                              tabletInsertionEvent1
+                                  .processRowByRow(
+                                      (row, collector) -> {
+                                        try {
+                                          collector.collectRow(row);
+                                          Assert.assertEquals(0, row.size());
+                                          count2.incrementAndGet();
+                                        } catch (IOException e) {
+                                          throw new RuntimeException(e);
+                                        }
+                                      })
+                                  .forEach(
+                                      tabletInsertionEvent2 ->
+                                          
tabletInsertionEvent2.processRowByRow(
+                                              (row, collector) -> {
+                                                try {
+                                                  collector.collectRow(row);
+                                                  Assert.assertEquals(0, 
row.size());
+                                                  count3.incrementAndGet();
+                                                } catch (IOException e) {
+                                                  throw new 
RuntimeException(e);
+                                                }
+                                              }))));
+
+      Assert.assertEquals(count1.get(), 0);
+      Assert.assertEquals(count2.get(), 0);
+      Assert.assertEquals(count3.get(), 0);
+    } catch (Exception e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    }
+  }
+}
diff --git 
a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java 
b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java
index 6c29e7c4c44..8e760edaef4 100644
--- 
a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java
+++ 
b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java
@@ -2049,6 +2049,24 @@ public class TsFileSequenceReader implements 
AutoCloseable {
     return result;
   }
 
+  /**
+   * get all types of measurements in this file
+   *
+   * @return full path -> datatype
+   */
+  public Map<String, TSDataType> getFullPathDataTypeMap() throws IOException {
+    final Map<String, TSDataType> result = new HashMap<>();
+    for (final String device : getAllDevices()) {
+      Map<String, TimeseriesMetadata> timeseriesMetadataMap = 
readDeviceMetadata(device);
+      for (TimeseriesMetadata timeseriesMetadata : 
timeseriesMetadataMap.values()) {
+        result.put(
+            device + TsFileConstant.PATH_SEPARATOR + 
timeseriesMetadata.getMeasurementId(),
+            timeseriesMetadata.getTSDataType());
+      }
+    }
+    return result;
+  }
+
   public Map<String, List<String>> getDeviceMeasurementsMap() throws 
IOException {
     Map<String, List<String>> result = new HashMap<>();
     for (String device : getAllDevices()) {

Reply via email to