This is an automated email from the ASF dual-hosted git repository.

haonan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 3e86379d82 [IOTDB-4681] speed up mpp load (#7725)
3e86379d82 is described below

commit 3e86379d829c3967178b692e500c37ac12549b8f
Author: yschengzi <[email protected]>
AuthorDate: Tue Nov 8 08:57:11 2022 +0800

    [IOTDB-4681] speed up mpp load (#7725)
---
 .../iotdb/commons/partition/DataPartition.java     |   8 -
 .../iotdb/db/engine/load/AlignedChunkData.java     | 322 ++++++----------
 .../org/apache/iotdb/db/engine/load/ChunkData.java |  25 +-
 .../apache/iotdb/db/engine/load/DeletionData.java  |   4 +-
 .../iotdb/db/engine/load/LoadTsFileManager.java    |  17 +-
 .../iotdb/db/engine/load/NonAlignedChunkData.java  | 245 +++++-------
 .../apache/iotdb/db/engine/load/TsFileData.java    |   3 +-
 .../load/TsFileSplitter.java}                      | 415 ++++++++-------------
 .../iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java  |  17 +-
 .../plan/node/load/LoadSingleTsFileNode.java       | 415 ++-------------------
 .../planner/plan/node/load/LoadTsFileNode.java     |  19 +-
 .../plan/node/load/LoadTsFilePieceNode.java        |  13 +-
 .../scheduler/load/LoadTsFileDispatcherImpl.java   |   2 +-
 .../plan/scheduler/load/LoadTsFileScheduler.java   | 230 ++++++++----
 .../org/apache/iotdb/db/utils/FileLoaderUtils.java |  24 ++
 .../apache/iotdb/db/utils/TimePartitionUtils.java  |   4 +
 .../iotdb/tsfile/file/header/ChunkHeader.java      |  21 ++
 .../iotdb/tsfile/read/TsFileSequenceReader.java    |  22 ++
 .../iotdb/tsfile/write/writer/TsFileIOWriter.java  |   4 +
 .../write/writer/TsFileIOWriterEndFileTest.java    |  49 +++
 20 files changed, 707 insertions(+), 1152 deletions(-)

diff --git 
a/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartition.java
 
b/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartition.java
index 4164fc716b..00c2139556 100644
--- 
a/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartition.java
+++ 
b/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartition.java
@@ -97,14 +97,6 @@ public class DataPartition extends Partition {
         .collect(Collectors.toList());
   }
 
-  public List<TRegionReplicaSet> getAllDataRegionReplicaSetForOneDevice(String 
deviceName) {
-    String storageGroup = getStorageGroupByDevice(deviceName);
-    TSeriesPartitionSlot seriesPartitionSlot = 
calculateDeviceGroupId(deviceName);
-    return 
dataPartitionMap.get(storageGroup).get(seriesPartitionSlot).entrySet().stream()
-        .flatMap(entry -> entry.getValue().stream())
-        .collect(Collectors.toList());
-  }
-
   public TRegionReplicaSet getDataRegionReplicaSetForWriting(
       String deviceName, TTimePartitionSlot timePartitionSlot) {
     // A list of data region replica sets will store data in a same time 
partition.
diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/load/AlignedChunkData.java 
b/server/src/main/java/org/apache/iotdb/db/engine/load/AlignedChunkData.java
index 103503d870..d81117e42b 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/load/AlignedChunkData.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/load/AlignedChunkData.java
@@ -21,22 +21,16 @@ package org.apache.iotdb.db.engine.load;
 
 import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
 import org.apache.iotdb.db.utils.TimePartitionUtils;
-import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
-import org.apache.iotdb.tsfile.encoding.decoder.Decoder;
 import org.apache.iotdb.tsfile.exception.write.PageException;
 import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
-import org.apache.iotdb.tsfile.file.MetaMarker;
 import org.apache.iotdb.tsfile.file.header.ChunkHeader;
 import org.apache.iotdb.tsfile.file.header.PageHeader;
 import org.apache.iotdb.tsfile.file.metadata.IChunkMetadata;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
 import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
-import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
 import org.apache.iotdb.tsfile.read.common.Chunk;
-import org.apache.iotdb.tsfile.read.reader.page.TimePageReader;
-import org.apache.iotdb.tsfile.read.reader.page.ValuePageReader;
 import org.apache.iotdb.tsfile.utils.Binary;
+import org.apache.iotdb.tsfile.utils.PublicBAOS;
 import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
 import org.apache.iotdb.tsfile.write.chunk.AlignedChunkWriterImpl;
@@ -45,13 +39,14 @@ import 
org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
 import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter;
 
 import java.io.DataOutputStream;
-import java.io.File;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.Serializable;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.LinkedList;
 import java.util.List;
+import java.util.Queue;
 
 public class AlignedChunkData implements ChunkData {
   private static final int DEFAULT_INT32 = 0;
@@ -61,33 +56,35 @@ public class AlignedChunkData implements ChunkData {
   private static final boolean DEFAULT_BOOLEAN = false;
   private static final Binary DEFAULT_BINARY = null;
 
-  private List<Long> offset;
-  private List<Long> dataSize;
-  private boolean isHeadPageNeedDecode;
-  private boolean isTailPageNeedDecode;
-
-  private TTimePartitionSlot timePartitionSlot;
-  private String device;
+  private final TTimePartitionSlot timePartitionSlot;
+  private final String device;
   private List<ChunkHeader> chunkHeaderList;
-  private List<IChunkMetadata> chunkMetadataList;
 
+  private final PublicBAOS byteStream;
+  private final DataOutputStream stream;
   private List<long[]> timeBatch;
-  private List<Integer> satisfiedTimeBatchLength;
+  private long dataSize;
+  private boolean needDecodeChunk;
+  private List<Integer> pageNumbers;
+  private Queue<Integer> satisfiedLengthQueue;
 
   private AlignedChunkWriterImpl chunkWriter;
   private List<Chunk> chunkList;
 
-  public AlignedChunkData(long timeOffset, String device, ChunkHeader 
chunkHeader) {
-    this.offset = new ArrayList<>();
-    this.dataSize = new ArrayList<>();
-    this.isHeadPageNeedDecode = false;
-    this.isTailPageNeedDecode = false;
+  public AlignedChunkData(
+      String device, ChunkHeader chunkHeader, TTimePartitionSlot 
timePartitionSlot) {
+    this.dataSize = 0;
     this.device = device;
     this.chunkHeaderList = new ArrayList<>();
+    this.timePartitionSlot = timePartitionSlot;
+    this.needDecodeChunk = true;
+    this.pageNumbers = new ArrayList<>();
+    this.satisfiedLengthQueue = new LinkedList<>();
+    this.byteStream = new PublicBAOS();
+    this.stream = new DataOutputStream(byteStream);
 
-    offset.add(timeOffset);
-    dataSize.add(0L);
     chunkHeaderList.add(chunkHeader);
+    pageNumbers.add(0);
   }
 
   @Override
@@ -102,38 +99,12 @@ public class AlignedChunkData implements ChunkData {
 
   @Override
   public long getDataSize() {
-    return dataSize.stream().mapToLong(o -> o).sum();
-  }
-
-  @Override
-  public void addDataSize(long pageSize) {
-    dataSize.set(0, dataSize.get(0) + pageSize);
-  }
-
-  @Override
-  public void setNotDecode(IChunkMetadata chunkMetadata) {
-    chunkMetadataList = new ArrayList<>();
-    chunkMetadataList.add(chunkMetadata);
+    return dataSize;
   }
 
   @Override
-  public boolean needDecodeChunk() {
-    return chunkMetadataList == null;
-  }
-
-  @Override
-  public void setHeadPageNeedDecode(boolean headPageNeedDecode) {
-    isHeadPageNeedDecode = headPageNeedDecode;
-  }
-
-  @Override
-  public void setTailPageNeedDecode(boolean tailPageNeedDecode) {
-    isTailPageNeedDecode = tailPageNeedDecode;
-  }
-
-  @Override
-  public void setTimePartitionSlot(TTimePartitionSlot timePartitionSlot) {
-    this.timePartitionSlot = timePartitionSlot;
+  public void setNotDecode() {
+    needDecodeChunk = false;
   }
 
   @Override
@@ -152,196 +123,119 @@ public class AlignedChunkData implements ChunkData {
     }
   }
 
-  public void addValueChunk(long offset, ChunkHeader chunkHeader, 
IChunkMetadata chunkMetadata) {
-    this.offset.add(offset);
-    this.dataSize.add(0L);
+  public void addValueChunk(ChunkHeader chunkHeader) {
     this.chunkHeaderList.add(chunkHeader);
-    if (chunkMetadataList != null) {
-      chunkMetadataList.add(chunkMetadata);
-    }
-  }
-
-  public void addValueChunkDataSize(long dataSize) {
-    int lastIndex = this.dataSize.size() - 1;
-    this.dataSize.set(lastIndex, this.dataSize.get(lastIndex) + dataSize);
+    this.pageNumbers.add(0);
   }
 
   @Override
-  public void serialize(DataOutputStream stream, File tsFile) throws 
IOException {
+  public void serialize(DataOutputStream stream) throws IOException {
     ReadWriteIOUtils.write(isModification(), stream);
     ReadWriteIOUtils.write(isAligned(), stream);
     serializeAttr(stream);
-    serializeTsFileData(stream, tsFile);
+    byteStream.writeTo(stream);
+    close();
   }
 
   private void serializeAttr(DataOutputStream stream) throws IOException {
     ReadWriteIOUtils.write(timePartitionSlot.getStartTime(), stream);
     ReadWriteIOUtils.write(device, stream);
+    ReadWriteIOUtils.write(needDecodeChunk, stream);
     ReadWriteIOUtils.write(chunkHeaderList.size(), stream);
     for (ChunkHeader chunkHeader : chunkHeaderList) {
       chunkHeader.serializeTo(stream); // chunk header already serialize chunk 
type
     }
-  }
-
-  private void serializeTsFileData(DataOutputStream stream, File tsFile) 
throws IOException {
-    timeBatch = new ArrayList<>();
-    satisfiedTimeBatchLength = new ArrayList<>();
-    ReadWriteIOUtils.write(needDecodeChunk(), stream);
-    try (TsFileSequenceReader reader = new 
TsFileSequenceReader(tsFile.getAbsolutePath())) {
-      int chunkSize = offset.size();
-      for (int i = 0; i < chunkSize; i++) {
-        if (needDecodeChunk()) {
-          serializeDecodeChunk(stream, reader, chunkHeaderList.get(i), i);
-        } else {
-          serializeEntireChunk(stream, reader, chunkHeaderList.get(i), 
chunkMetadataList.get(i));
-        }
+    if (needDecodeChunk) {
+      for (Integer pageNumber : pageNumbers) {
+        ReadWriteIOUtils.write(pageNumber, stream);
       }
     }
-    timeBatch = null;
-    satisfiedTimeBatchLength = null;
   }
 
-  private void serializeEntireChunk(
-      DataOutputStream stream,
-      TsFileSequenceReader reader,
-      ChunkHeader chunkHeader,
-      IChunkMetadata chunkMetadata)
+  @Override
+  public void writeEntireChunk(ByteBuffer chunkData, IChunkMetadata 
chunkMetadata)
       throws IOException {
-    ByteBuffer chunkData =
-        reader.readChunk(
-            chunkMetadata.getOffsetOfChunkHeader() + 
chunkHeader.getSerializedSize(),
-            chunkHeader.getDataSize());
-    ReadWriteIOUtils.write(chunkData, stream);
+    dataSize += ReadWriteIOUtils.write(chunkData, stream);
     chunkMetadata.getStatistics().serialize(stream);
   }
 
-  private void serializeDecodeChunk(
-      DataOutputStream stream, TsFileSequenceReader reader, ChunkHeader 
chunkHeader, int chunkIndex)
-      throws IOException {
-    Decoder defaultTimeDecoder =
-        Decoder.getDecoderByType(
-            
TSEncoding.valueOf(TSFileDescriptor.getInstance().getConfig().getTimeEncoder()),
-            TSDataType.INT64);
-    Decoder valueDecoder =
-        Decoder.getDecoderByType(chunkHeader.getEncodingType(), 
chunkHeader.getDataType());
-
-    reader.position(offset.get(chunkIndex));
-    int decodePageIndex = 0; // should be 0,1 or 2
-    long dataSize = this.dataSize.get(chunkIndex);
-    while (dataSize > 0) {
-      boolean hasStatistic = (chunkHeader.getChunkType() & 0x3F) == 
MetaMarker.CHUNK_HEADER;
-      PageHeader pageHeader = reader.readPageHeader(chunkHeader.getDataType(), 
hasStatistic);
-      long pageDataSize = pageHeader.getSerializedPageSize();
-      if ((dataSize == this.dataSize.get(chunkIndex) && isHeadPageNeedDecode) 
// decode head page
-          || (dataSize == pageDataSize && isTailPageNeedDecode)) { // decode 
tail page
-        ReadWriteIOUtils.write(true, stream); // decode
-        if (chunkIndex == 0) {
-          decodeTimePage(reader, chunkHeader, pageHeader, defaultTimeDecoder, 
valueDecoder, stream);
-        } else {
-          decodeValuePage(reader, chunkHeader, pageHeader, decodePageIndex, 
valueDecoder, stream);
-        }
-        decodePageIndex += 1;
-      } else { // entire page
-        ReadWriteIOUtils.write(false, stream); // don't decode
-        pageHeader.serializeTo(stream);
-        ByteBuffer pageData = reader.readCompressedPage(pageHeader);
-        ReadWriteIOUtils.write(pageData, stream);
-      }
-      dataSize -= pageDataSize;
-    }
-
-    ReadWriteIOUtils.write(true, stream); // means ending
-    ReadWriteIOUtils.write(-1, stream);
+  @Override
+  public void writeEntirePage(PageHeader pageHeader, ByteBuffer pageData) 
throws IOException {
+    pageNumbers.set(pageNumbers.size() - 1, pageNumbers.get(pageNumbers.size() 
- 1) + 1);
+    dataSize += ReadWriteIOUtils.write(false, stream);
+    pageHeader.serializeTo(stream);
+    dataSize += ReadWriteIOUtils.write(pageData, stream);
   }
 
-  private void decodeTimePage(
-      TsFileSequenceReader reader,
-      ChunkHeader chunkHeader,
-      PageHeader pageHeader,
-      Decoder timeDecoder,
-      Decoder valueDecoder,
-      DataOutputStream stream)
+  @Override
+  public void writeDecodePage(long[] times, Object[] values, int 
satisfiedLength)
       throws IOException {
-    valueDecoder.reset();
-    ByteBuffer pageData = reader.readPage(pageHeader, 
chunkHeader.getCompressionType());
-    TimePageReader timePageReader = new TimePageReader(pageHeader, pageData, 
timeDecoder);
-    long[] decodeTime = timePageReader.getNextTimeBatch();
-    int satisfiedLength = 0;
-    long[] time = new long[decodeTime.length];
-    for (int i = 0; i < decodeTime.length; i++) {
-      if (decodeTime[i] < timePartitionSlot.getStartTime()) {
+    pageNumbers.set(pageNumbers.size() - 1, pageNumbers.get(pageNumbers.size() 
- 1) + 1);
+    satisfiedLengthQueue.offer(satisfiedLength);
+    long startTime = timePartitionSlot.getStartTime();
+    long endTime = startTime + 
TimePartitionUtils.getTimePartitionIntervalForRouting();
+    dataSize += ReadWriteIOUtils.write(true, stream);
+    dataSize += ReadWriteIOUtils.write(satisfiedLength, stream);
+
+    for (int i = 0; i < times.length; i++) {
+      if (times[i] < startTime) {
         continue;
-      } else if (!timePartitionSlot.equals(
-          TimePartitionUtils.getTimePartitionForRouting(decodeTime[i]))) {
+      } else if (times[i] >= endTime) {
         break;
       }
-      time[satisfiedLength++] = decodeTime[i];
+      dataSize += ReadWriteIOUtils.write(times[i], stream);
     }
-    ReadWriteIOUtils.write(satisfiedLength, stream);
-    for (int i = 0; i < satisfiedLength; i++) {
-      ReadWriteIOUtils.write(time[i], stream);
-    }
-    timeBatch.add(decodeTime);
-    satisfiedTimeBatchLength.add(satisfiedLength);
   }
 
-  private void decodeValuePage(
-      TsFileSequenceReader reader,
-      ChunkHeader chunkHeader,
-      PageHeader pageHeader,
-      int pageIndex,
-      Decoder valueDecoder,
-      DataOutputStream stream)
+  public void writeDecodeValuePage(long[] times, TsPrimitiveType[] values, 
TSDataType dataType)
       throws IOException {
-    valueDecoder.reset();
-    ByteBuffer pageData = reader.readPage(pageHeader, 
chunkHeader.getCompressionType());
-    ValuePageReader valuePageReader =
-        new ValuePageReader(pageHeader, pageData, chunkHeader.getDataType(), 
valueDecoder);
-    long[] time = timeBatch.get(pageIndex);
-    TsPrimitiveType[] valueBatch =
-        valuePageReader.nextValueBatch(
-            time); // should be origin time, so recording satisfied length is 
necessary
-    ReadWriteIOUtils.write(satisfiedTimeBatchLength.get(pageIndex), stream);
-    for (int i = 0; i < valueBatch.length; i++) {
-      if (time[i] < timePartitionSlot.getStartTime()) {
+    pageNumbers.set(pageNumbers.size() - 1, pageNumbers.get(pageNumbers.size() 
- 1) + 1);
+    long startTime = timePartitionSlot.getStartTime();
+    long endTime = startTime + 
TimePartitionUtils.getTimePartitionIntervalForRouting();
+    int satisfiedLength = satisfiedLengthQueue.poll();
+    dataSize += ReadWriteIOUtils.write(true, stream);
+    dataSize += ReadWriteIOUtils.write(satisfiedLength, stream);
+    satisfiedLengthQueue.offer(satisfiedLength);
+
+    for (int i = 0; i < times.length; i++) {
+      if (times[i] < startTime) {
         continue;
-      } else if (!timePartitionSlot.equals(
-          TimePartitionUtils.getTimePartitionForRouting(time[i]))) {
+      } else if (times[i] >= endTime) {
         break;
       }
-      if (valueBatch[i] == null) {
-        ReadWriteIOUtils.write(true, stream);
+
+      if (values[i] == null) {
+        dataSize += ReadWriteIOUtils.write(true, stream);
         continue;
       }
-      ReadWriteIOUtils.write(false, stream);
-      switch (chunkHeader.getDataType()) {
+      dataSize += ReadWriteIOUtils.write(false, stream);
+      switch (dataType) {
         case INT32:
-          ReadWriteIOUtils.write(valueBatch[i].getInt(), stream);
+          dataSize += ReadWriteIOUtils.write(values[i].getInt(), stream);
           break;
         case INT64:
-          ReadWriteIOUtils.write(valueBatch[i].getLong(), stream);
+          dataSize += ReadWriteIOUtils.write(values[i].getLong(), stream);
           break;
         case FLOAT:
-          ReadWriteIOUtils.write(valueBatch[i].getFloat(), stream);
+          dataSize += ReadWriteIOUtils.write(values[i].getFloat(), stream);
           break;
         case DOUBLE:
-          ReadWriteIOUtils.write(valueBatch[i].getDouble(), stream);
+          dataSize += ReadWriteIOUtils.write(values[i].getDouble(), stream);
           break;
         case BOOLEAN:
-          ReadWriteIOUtils.write(valueBatch[i].getBoolean(), stream);
+          dataSize += ReadWriteIOUtils.write(values[i].getBoolean(), stream);
           break;
         case TEXT:
-          ReadWriteIOUtils.write(valueBatch[i].getBinary(), stream);
+          dataSize += ReadWriteIOUtils.write(values[i].getBinary(), stream);
           break;
         default:
           throw new UnSupportedDataTypeException(
-              String.format("Data type %s is not supported.", 
chunkHeader.getDataType()));
+              String.format("Data type %s is not supported.", dataType));
       }
     }
   }
 
   private void deserializeTsFileData(InputStream stream) throws IOException, 
PageException {
-    boolean needDecodeChunk = ReadWriteIOUtils.readBool(stream);
     if (needDecodeChunk) {
       buildChunkWriter(stream);
     } else {
@@ -378,24 +272,24 @@ public class AlignedChunkData implements ChunkData {
     timeBatch = new ArrayList<>();
     int chunkHeaderSize = chunkHeaderList.size();
     for (int i = 0; i < chunkHeaderSize; i++) {
-      buildChunk(stream, chunkHeaderList.get(i), i - 1, i == 0);
+      buildChunk(stream, chunkHeaderList.get(i), pageNumbers.get(i), i - 1, i 
== 0);
     }
     timeBatch = null;
   }
 
   private void buildChunk(
-      InputStream stream, ChunkHeader chunkHeader, int valueChunkIndex, 
boolean isTimeChunk)
+      InputStream stream,
+      ChunkHeader chunkHeader,
+      int pageNumber,
+      int valueChunkIndex,
+      boolean isTimeChunk)
       throws IOException, PageException {
     boolean needDecode;
     int decodePageIndex = 0;
-    while (true) {
+    for (int j = 0; j < pageNumber; j++) {
       needDecode = ReadWriteIOUtils.readBool(stream);
       if (needDecode) {
         int length = ReadWriteIOUtils.readInt(stream);
-        if (length == -1) {
-          break;
-        }
-
         long[] timePageBatch = new long[length];
         if (!isTimeChunk) {
           timePageBatch = timeBatch.get(decodePageIndex);
@@ -465,42 +359,52 @@ public class AlignedChunkData implements ChunkData {
   }
 
   public static AlignedChunkData deserialize(InputStream stream) throws 
IOException, PageException {
-    long timePartition = ReadWriteIOUtils.readLong(stream);
+    TTimePartitionSlot timePartitionSlot =
+        
TimePartitionUtils.getTimePartitionForRouting(ReadWriteIOUtils.readLong(stream));
     String device = ReadWriteIOUtils.readString(stream);
+    boolean needDecodeChunk = ReadWriteIOUtils.readBool(stream);
     int chunkHeaderListSize = ReadWriteIOUtils.readInt(stream);
-    ChunkHeader[] chunkHeaderList = new ChunkHeader[chunkHeaderListSize];
+    List<ChunkHeader> chunkHeaderList = new ArrayList<>();
     for (int i = 0; i < chunkHeaderListSize; i++) {
       byte chunkType = ReadWriteIOUtils.readByte(stream);
-      chunkHeaderList[i] = ChunkHeader.deserializeFrom(stream, chunkType);
+      chunkHeaderList.add(ChunkHeader.deserializeFrom(stream, chunkType));
     }
-
-    AlignedChunkData chunkData = new AlignedChunkData(-1, device, 
chunkHeaderList[0]);
-    for (int i = 1; i < chunkHeaderListSize; i++) {
-      chunkData.addValueChunk(-1, chunkHeaderList[i], null);
+    List<Integer> pageNumbers = new ArrayList<>();
+    if (needDecodeChunk) {
+      for (int i = 0; i < chunkHeaderListSize; i++) {
+        pageNumbers.add(ReadWriteIOUtils.readInt(stream));
+      }
     }
-    
chunkData.setTimePartitionSlot(TimePartitionUtils.getTimePartitionForRouting(timePartition));
+
+    AlignedChunkData chunkData =
+        new AlignedChunkData(device, chunkHeaderList.get(0), 
timePartitionSlot);
+    chunkData.needDecodeChunk = needDecodeChunk;
+    chunkData.chunkHeaderList = chunkHeaderList;
+    chunkData.pageNumbers = pageNumbers;
     chunkData.deserializeTsFileData(stream);
+    chunkData.close();
     return chunkData;
   }
 
+  private void close() throws IOException {
+    byteStream.close();
+    stream.close();
+  }
+
   @Override
   public String toString() {
     return "AlignedChunkData{"
-        + "offset="
-        + offset
-        + ", dataSize="
-        + dataSize
-        + ", isHeadPageNeedDecode="
-        + isHeadPageNeedDecode
-        + ", isTailPageNeedDecode="
-        + isTailPageNeedDecode
-        + ", timePartitionSlot="
+        + "timePartitionSlot="
         + timePartitionSlot
         + ", device='"
         + device
         + '\''
         + ", chunkHeaderList="
         + chunkHeaderList
+        + ", totalDataSize="
+        + dataSize
+        + ", needDecodeChunk="
+        + needDecodeChunk
         + '}';
   }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/load/ChunkData.java 
b/server/src/main/java/org/apache/iotdb/db/engine/load/ChunkData.java
index bf5dae6256..0b19a43b9b 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/load/ChunkData.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/load/ChunkData.java
@@ -22,30 +22,28 @@ package org.apache.iotdb.db.engine.load;
 import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
 import org.apache.iotdb.tsfile.exception.write.PageException;
 import org.apache.iotdb.tsfile.file.header.ChunkHeader;
+import org.apache.iotdb.tsfile.file.header.PageHeader;
 import org.apache.iotdb.tsfile.file.metadata.IChunkMetadata;
 import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 
 import java.io.IOException;
 import java.io.InputStream;
+import java.nio.ByteBuffer;
 
 public interface ChunkData extends TsFileData {
   String getDevice();
 
   TTimePartitionSlot getTimePartitionSlot();
 
-  void addDataSize(long pageSize);
+  void setNotDecode();
 
-  void setNotDecode(IChunkMetadata chunkMetadata);
-
-  boolean needDecodeChunk();
-
-  void setHeadPageNeedDecode(boolean headPageNeedDecode);
+  boolean isAligned();
 
-  void setTailPageNeedDecode(boolean tailPageNeedDecode);
+  void writeEntireChunk(ByteBuffer chunkData, IChunkMetadata chunkMetadata) 
throws IOException;
 
-  void setTimePartitionSlot(TTimePartitionSlot timePartitionSlot);
+  void writeEntirePage(PageHeader pageHeader, ByteBuffer pageData) throws 
IOException;
 
-  boolean isAligned();
+  void writeDecodePage(long[] times, Object[] values, int satisfiedLength) 
throws IOException;
 
   @Override
   default boolean isModification() {
@@ -60,9 +58,12 @@ public interface ChunkData extends TsFileData {
   }
 
   static ChunkData createChunkData(
-      boolean isAligned, long offset, String device, ChunkHeader chunkHeader) {
+      boolean isAligned,
+      String device,
+      ChunkHeader chunkHeader,
+      TTimePartitionSlot timePartitionSlot) {
     return isAligned
-        ? new AlignedChunkData(offset, device, chunkHeader)
-        : new NonAlignedChunkData(offset, device, chunkHeader);
+        ? new AlignedChunkData(device, chunkHeader, timePartitionSlot)
+        : new NonAlignedChunkData(device, chunkHeader, timePartitionSlot);
   }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/load/DeletionData.java 
b/server/src/main/java/org/apache/iotdb/db/engine/load/DeletionData.java
index 2b9cfe8e76..8fcfa552d5 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/load/DeletionData.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/load/DeletionData.java
@@ -40,7 +40,7 @@ public class DeletionData implements TsFileData {
 
   @Override
   public long getDataSize() {
-    return 0;
+    return Long.BYTES;
   }
 
   @Override
@@ -60,7 +60,7 @@ public class DeletionData implements TsFileData {
   }
 
   @Override
-  public void serialize(DataOutputStream stream, File tsFile) throws 
IOException {
+  public void serialize(DataOutputStream stream) throws IOException {
     ReadWriteIOUtils.write(isModification(), stream);
     deletion.serializeWithoutFileOffset(stream);
   }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/load/LoadTsFileManager.java 
b/server/src/main/java/org/apache/iotdb/db/engine/load/LoadTsFileManager.java
index 57119901cb..fa9f7abd9b 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/load/LoadTsFileManager.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/load/LoadTsFileManager.java
@@ -28,14 +28,13 @@ import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.engine.storagegroup.DataRegion;
 import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
-import org.apache.iotdb.db.engine.storagegroup.TsFileResourceStatus;
 import org.apache.iotdb.db.exception.LoadFileException;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.load.LoadTsFilePieceNode;
 import org.apache.iotdb.db.mpp.plan.scheduler.load.LoadTsFileScheduler;
 import 
org.apache.iotdb.db.mpp.plan.scheduler.load.LoadTsFileScheduler.LoadCommand;
+import org.apache.iotdb.db.utils.FileLoaderUtils;
 import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
 import org.apache.iotdb.tsfile.exception.write.PageException;
-import org.apache.iotdb.tsfile.file.metadata.TimeseriesMetadata;
 import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter;
 
 import org.slf4j.Logger;
@@ -44,7 +43,6 @@ import org.slf4j.LoggerFactory;
 import java.io.File;
 import java.io.IOException;
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.concurrent.ConcurrentHashMap;
@@ -235,18 +233,7 @@ public class LoadTsFileManager {
     }
 
     private TsFileResource generateResource(TsFileIOWriter writer) throws 
IOException {
-      TsFileResource tsFileResource = new TsFileResource(writer.getFile());
-      Map<String, List<TimeseriesMetadata>> deviceTimeseriesMetadataMap =
-          writer.getDeviceTimeseriesMetadataMap();
-      for (Map.Entry<String, List<TimeseriesMetadata>> entry :
-          deviceTimeseriesMetadataMap.entrySet()) {
-        String device = entry.getKey();
-        for (TimeseriesMetadata timeseriesMetaData : entry.getValue()) {
-          tsFileResource.updateStartTime(device, 
timeseriesMetaData.getStatistics().getStartTime());
-          tsFileResource.updateEndTime(device, 
timeseriesMetaData.getStatistics().getEndTime());
-        }
-      }
-      tsFileResource.setStatus(TsFileResourceStatus.CLOSED);
+      TsFileResource tsFileResource = 
FileLoaderUtils.generateTsFileResource(writer);
       tsFileResource.serialize();
       return tsFileResource;
     }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/load/NonAlignedChunkData.java 
b/server/src/main/java/org/apache/iotdb/db/engine/load/NonAlignedChunkData.java
index c3d378a2b5..3091988f9b 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/load/NonAlignedChunkData.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/load/NonAlignedChunkData.java
@@ -21,55 +21,51 @@ package org.apache.iotdb.db.engine.load;
 
 import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
 import org.apache.iotdb.db.utils.TimePartitionUtils;
-import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
-import org.apache.iotdb.tsfile.encoding.decoder.Decoder;
 import org.apache.iotdb.tsfile.exception.write.PageException;
 import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
-import org.apache.iotdb.tsfile.file.MetaMarker;
 import org.apache.iotdb.tsfile.file.header.ChunkHeader;
 import org.apache.iotdb.tsfile.file.header.PageHeader;
 import org.apache.iotdb.tsfile.file.metadata.IChunkMetadata;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
 import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
-import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
-import org.apache.iotdb.tsfile.read.common.BatchData;
 import org.apache.iotdb.tsfile.read.common.Chunk;
-import org.apache.iotdb.tsfile.read.reader.page.PageReader;
 import org.apache.iotdb.tsfile.utils.Binary;
+import org.apache.iotdb.tsfile.utils.PublicBAOS;
 import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 import org.apache.iotdb.tsfile.write.chunk.ChunkWriterImpl;
 import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
 import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter;
 
 import java.io.DataOutputStream;
-import java.io.File;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.Serializable;
 import java.nio.ByteBuffer;
 
 public class NonAlignedChunkData implements ChunkData {
-  private long offset;
-  private long dataSize;
-  private boolean isHeadPageNeedDecode;
-  private boolean isTailPageNeedDecode;
 
-  private TTimePartitionSlot timePartitionSlot;
-  private String device;
-  private ChunkHeader chunkHeader;
-  private IChunkMetadata chunkMetadata;
+  private final TTimePartitionSlot timePartitionSlot;
+  private final String device;
+  private final ChunkHeader chunkHeader;
+
+  private final PublicBAOS byteStream;
+  private final DataOutputStream stream;
+  private long dataSize;
+  private boolean needDecodeChunk;
+  private int pageNumber;
 
   private ChunkWriterImpl chunkWriter;
   private Chunk chunk;
 
-  public NonAlignedChunkData(long offset, String device, ChunkHeader 
chunkHeader) {
-    this.offset = offset;
+  public NonAlignedChunkData(
+      String device, ChunkHeader chunkHeader, TTimePartitionSlot 
timePartitionSlot) {
     this.dataSize = 0;
-    this.isHeadPageNeedDecode = false;
-    this.isTailPageNeedDecode = false;
     this.device = device;
     this.chunkHeader = chunkHeader;
+    this.timePartitionSlot = timePartitionSlot;
+    this.needDecodeChunk = true;
+    this.pageNumber = 0;
+    this.byteStream = new PublicBAOS();
+    this.stream = new DataOutputStream(byteStream);
   }
 
   @Override
@@ -88,33 +84,8 @@ public class NonAlignedChunkData implements ChunkData {
   }
 
   @Override
-  public void addDataSize(long pageSize) {
-    dataSize += pageSize;
-  }
-
-  @Override
-  public void setNotDecode(IChunkMetadata chunkMetadata) {
-    this.chunkMetadata = chunkMetadata;
-  }
-
-  @Override
-  public boolean needDecodeChunk() {
-    return chunkMetadata == null;
-  }
-
-  @Override
-  public void setHeadPageNeedDecode(boolean headPageNeedDecode) {
-    isHeadPageNeedDecode = headPageNeedDecode;
-  }
-
-  @Override
-  public void setTailPageNeedDecode(boolean tailPageNeedDecode) {
-    isTailPageNeedDecode = tailPageNeedDecode;
-  }
-
-  @Override
-  public void setTimePartitionSlot(TTimePartitionSlot timePartitionSlot) {
-    this.timePartitionSlot = timePartitionSlot;
+  public void setNotDecode() {
+    needDecodeChunk = false;
   }
 
   @Override
@@ -132,137 +103,83 @@ public class NonAlignedChunkData implements ChunkData {
   }
 
   @Override
-  public void serialize(DataOutputStream stream, File tsFile) throws 
IOException {
+  public void serialize(DataOutputStream stream) throws IOException {
     ReadWriteIOUtils.write(isModification(), stream);
     ReadWriteIOUtils.write(isAligned(), stream);
     serializeAttr(stream);
-    if (needDecodeChunk()) {
-      ReadWriteIOUtils.write(true, stream);
-      serializeDecodeChunk(stream, tsFile);
-    } else {
-      ReadWriteIOUtils.write(false, stream);
-      serializeEntireChunk(stream, tsFile);
-    }
+    byteStream.writeTo(stream);
+    close();
   }
 
   private void serializeAttr(DataOutputStream stream) throws IOException {
     ReadWriteIOUtils.write(timePartitionSlot.getStartTime(), stream);
     ReadWriteIOUtils.write(device, stream);
+    ReadWriteIOUtils.write(needDecodeChunk, stream);
     chunkHeader.serializeTo(stream); // chunk header already serialize chunk 
type
-  }
-
-  private void serializeEntireChunk(DataOutputStream stream, File tsFile) 
throws IOException {
-    try (TsFileSequenceReader reader = new 
TsFileSequenceReader(tsFile.getAbsolutePath())) {
-      ByteBuffer chunkData =
-          reader.readChunk(
-              chunkMetadata.getOffsetOfChunkHeader() + 
chunkHeader.getSerializedSize(),
-              chunkHeader.getDataSize());
-      ReadWriteIOUtils.write(chunkData, stream);
-      chunkMetadata.getStatistics().serialize(stream);
+    if (needDecodeChunk) {
+      ReadWriteIOUtils.write(pageNumber, stream);
     }
   }
 
-  private void serializeDecodeChunk(DataOutputStream stream, File tsFile) 
throws IOException {
-    try (TsFileSequenceReader reader = new 
TsFileSequenceReader(tsFile.getAbsolutePath())) {
-      Decoder defaultTimeDecoder =
-          Decoder.getDecoderByType(
-              
TSEncoding.valueOf(TSFileDescriptor.getInstance().getConfig().getTimeEncoder()),
-              TSDataType.INT64);
-      Decoder valueDecoder =
-          Decoder.getDecoderByType(chunkHeader.getEncodingType(), 
chunkHeader.getDataType());
-
-      reader.position(offset);
-      long dataSize = this.dataSize;
-      while (dataSize > 0) {
-        boolean hasStatistic = (chunkHeader.getChunkType() & 0x3F) == 
MetaMarker.CHUNK_HEADER;
-        PageHeader pageHeader = 
reader.readPageHeader(chunkHeader.getDataType(), hasStatistic);
-        long pageDataSize = pageHeader.getSerializedPageSize();
-        if ((dataSize == this.dataSize && isHeadPageNeedDecode) // decode head 
page
-            || (dataSize == pageDataSize && isTailPageNeedDecode)) { // decode 
tail page
-          ReadWriteIOUtils.write(true, stream); // decode
-          decodePage(reader, pageHeader, defaultTimeDecoder, valueDecoder, 
stream);
-        } else { // entire page
-          ReadWriteIOUtils.write(false, stream); // don't decode
-          pageHeader.serializeTo(stream);
-          ByteBuffer pageData = reader.readCompressedPage(pageHeader);
-          ReadWriteIOUtils.write(pageData, stream);
-        }
-        dataSize -= pageDataSize;
-      }
-    }
+  @Override
+  public void writeEntireChunk(ByteBuffer chunkData, IChunkMetadata 
chunkMetadata)
+      throws IOException {
+    dataSize += ReadWriteIOUtils.write(chunkData, stream);
+    dataSize += chunkMetadata.getStatistics().serialize(stream);
+  }
 
-    ReadWriteIOUtils.write(true, stream); // means ending
-    ReadWriteIOUtils.write(-1, stream);
+  @Override
+  public void writeEntirePage(PageHeader pageHeader, ByteBuffer pageData) 
throws IOException {
+    pageNumber += 1;
+    dataSize += ReadWriteIOUtils.write(false, stream);
+    pageHeader.serializeTo(stream);
+    dataSize += ReadWriteIOUtils.write(pageData, stream);
   }
 
-  private void decodePage(
-      TsFileSequenceReader reader,
-      PageHeader pageHeader,
-      Decoder timeDecoder,
-      Decoder valueDecoder,
-      DataOutputStream stream)
+  @Override
+  public void writeDecodePage(long[] times, Object[] values, int 
satisfiedLength)
       throws IOException {
-    valueDecoder.reset();
-    ByteBuffer pageData = reader.readPage(pageHeader, 
chunkHeader.getCompressionType());
-    PageReader pageReader =
-        new PageReader(pageData, chunkHeader.getDataType(), valueDecoder, 
timeDecoder, null);
-    BatchData batchData = pageReader.getAllSatisfiedPageData();
-
-    int length = 0;
-    while (batchData.hasCurrent()) {
-      long time = batchData.currentTime();
-      if (time < timePartitionSlot.getStartTime()) {
-        batchData.next();
+    pageNumber += 1;
+    long startTime = timePartitionSlot.getStartTime();
+    long endTime = startTime + 
TimePartitionUtils.getTimePartitionIntervalForRouting();
+    dataSize += ReadWriteIOUtils.write(true, stream);
+    dataSize += ReadWriteIOUtils.write(satisfiedLength, stream);
+
+    for (int i = 0; i < times.length; i++) {
+      if (times[i] < startTime) {
         continue;
-      } else if 
(!timePartitionSlot.equals(TimePartitionUtils.getTimePartitionForRouting(time)))
 {
+      } else if (times[i] >= endTime) {
         break;
       }
-      length += 1;
-      batchData.next();
-    }
 
-    ReadWriteIOUtils.write(length, stream);
-    batchData.resetBatchData();
-    while (batchData.hasCurrent()) {
-      long time = batchData.currentTime();
-      if (time < timePartitionSlot.getStartTime()) {
-        batchData.next();
-        continue;
-      } else if 
(!timePartitionSlot.equals(TimePartitionUtils.getTimePartitionForRouting(time)))
 {
-        break;
-      }
-
-      ReadWriteIOUtils.write(time, stream);
-      Object value = batchData.currentValue();
+      dataSize += ReadWriteIOUtils.write(times[i], stream);
       switch (chunkHeader.getDataType()) {
         case INT32:
-          ReadWriteIOUtils.write((int) value, stream);
+          dataSize += ReadWriteIOUtils.write((int) values[i], stream);
           break;
         case INT64:
-          ReadWriteIOUtils.write((long) value, stream);
+          dataSize += ReadWriteIOUtils.write((long) values[i], stream);
           break;
         case FLOAT:
-          ReadWriteIOUtils.write((float) value, stream);
+          dataSize += ReadWriteIOUtils.write((float) values[i], stream);
           break;
         case DOUBLE:
-          ReadWriteIOUtils.write((double) value, stream);
+          dataSize += ReadWriteIOUtils.write((double) values[i], stream);
           break;
         case BOOLEAN:
-          ReadWriteIOUtils.write((boolean) value, stream);
+          dataSize += ReadWriteIOUtils.write((boolean) values[i], stream);
           break;
         case TEXT:
-          ReadWriteIOUtils.write((Binary) value, stream);
+          dataSize += ReadWriteIOUtils.write((Binary) values[i], stream);
           break;
         default:
           throw new UnSupportedDataTypeException(
               String.format("Data type %s is not supported.", 
chunkHeader.getDataType()));
       }
-      batchData.next();
     }
   }
 
   private void deserializeTsFileData(InputStream stream) throws IOException, 
PageException {
-    boolean needDecodeChunk = ReadWriteIOUtils.readBool(stream);
     if (needDecodeChunk) {
       buildChunkWriter(stream);
     } else {
@@ -270,6 +187,14 @@ public class NonAlignedChunkData implements ChunkData {
     }
   }
 
+  private void deserializeEntireChunk(InputStream stream) throws IOException {
+    ByteBuffer chunkData =
+        
ByteBuffer.wrap(ReadWriteIOUtils.readBytesWithSelfDescriptionLength(stream));
+    Statistics<? extends Serializable> statistics =
+        Statistics.deserialize(stream, chunkHeader.getDataType());
+    chunk = new Chunk(chunkHeader, chunkData, null, statistics);
+  }
+
   private void buildChunkWriter(InputStream stream) throws IOException, 
PageException {
     chunkWriter =
         new ChunkWriterImpl(
@@ -279,14 +204,10 @@ public class NonAlignedChunkData implements ChunkData {
                 chunkHeader.getEncodingType(),
                 chunkHeader.getCompressionType()));
     boolean needDecode;
-    while (true) {
+    for (int j = 0; j < pageNumber; j++) {
       needDecode = ReadWriteIOUtils.readBool(stream);
       if (needDecode) {
         int length = ReadWriteIOUtils.readInt(stream);
-        if (length == -1) {
-          break;
-        }
-
         for (int i = 0; i < length; i++) {
           long time = ReadWriteIOUtils.readLong(stream);
           switch (chunkHeader.getDataType()) {
@@ -324,37 +245,37 @@ public class NonAlignedChunkData implements ChunkData {
     }
   }
 
-  private void deserializeEntireChunk(InputStream stream) throws IOException {
-    ByteBuffer chunkData =
-        
ByteBuffer.wrap(ReadWriteIOUtils.readBytesWithSelfDescriptionLength(stream));
-    Statistics<? extends Serializable> statistics =
-        Statistics.deserialize(stream, chunkHeader.getDataType());
-    chunk = new Chunk(chunkHeader, chunkData, null, statistics);
-  }
-
   public static NonAlignedChunkData deserialize(InputStream stream)
       throws IOException, PageException {
-    long timePartition = ReadWriteIOUtils.readLong(stream);
+    TTimePartitionSlot timePartitionSlot =
+        
TimePartitionUtils.getTimePartitionForRouting(ReadWriteIOUtils.readLong(stream));
     String device = ReadWriteIOUtils.readString(stream);
+    boolean needDecodeChunk = ReadWriteIOUtils.readBool(stream);
     byte chunkType = ReadWriteIOUtils.readByte(stream);
     ChunkHeader chunkHeader = ChunkHeader.deserializeFrom(stream, chunkType);
-    NonAlignedChunkData chunkData = new NonAlignedChunkData(-1, device, 
chunkHeader);
-    
chunkData.setTimePartitionSlot(TimePartitionUtils.getTimePartitionForRouting(timePartition));
+    int pageNumber = 0;
+    if (needDecodeChunk) {
+      pageNumber = ReadWriteIOUtils.readInt(stream);
+    }
+
+    NonAlignedChunkData chunkData = new NonAlignedChunkData(device, 
chunkHeader, timePartitionSlot);
+    chunkData.needDecodeChunk = needDecodeChunk;
+    chunkData.pageNumber = pageNumber;
     chunkData.deserializeTsFileData(stream);
+    chunkData.close();
     return chunkData;
   }
 
+  private void close() throws IOException {
+    byteStream.close();
+    stream.close();
+  }
+
   @Override
   public String toString() {
     return "NonAlignedChunkData{"
-        + "offset="
-        + offset
-        + ", dataSize="
+        + "dataSize="
         + dataSize
-        + ", isHeadPageNeedDecode="
-        + isHeadPageNeedDecode
-        + ", isTailPageNeedDecode="
-        + isTailPageNeedDecode
         + ", timePartitionSlot="
         + timePartitionSlot
         + ", device='"
@@ -362,6 +283,8 @@ public class NonAlignedChunkData implements ChunkData {
         + '\''
         + ", chunkHeader="
         + chunkHeader
+        + ", needDecodeChunk="
+        + needDecodeChunk
         + '}';
   }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/load/TsFileData.java 
b/server/src/main/java/org/apache/iotdb/db/engine/load/TsFileData.java
index 7f99064ff2..0fd9c5751f 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/load/TsFileData.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/load/TsFileData.java
@@ -25,7 +25,6 @@ import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter;
 
 import java.io.DataOutputStream;
-import java.io.File;
 import java.io.IOException;
 import java.io.InputStream;
 
@@ -36,7 +35,7 @@ public interface TsFileData {
 
   boolean isModification();
 
-  void serialize(DataOutputStream stream, File tsFile) throws IOException;
+  void serialize(DataOutputStream stream) throws IOException;
 
   static TsFileData deserialize(InputStream stream)
       throws IOException, PageException, IllegalPathException {
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/load/LoadSingleTsFileNode.java
 b/server/src/main/java/org/apache/iotdb/db/engine/load/TsFileSplitter.java
similarity index 53%
copy from 
server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/load/LoadSingleTsFileNode.java
copy to server/src/main/java/org/apache/iotdb/db/engine/load/TsFileSplitter.java
index 03bd5f3bae..edaa692eac 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/load/LoadSingleTsFileNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/load/TsFileSplitter.java
@@ -17,32 +17,17 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.mpp.plan.planner.plan.node.load;
+package org.apache.iotdb.db.engine.load;
 
-import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
-import org.apache.iotdb.common.rpc.thrift.TEndPoint;
-import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
 import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
-import org.apache.iotdb.commons.partition.DataPartition;
-import org.apache.iotdb.db.conf.IoTDBDescriptor;
-import org.apache.iotdb.db.engine.load.AlignedChunkData;
-import org.apache.iotdb.db.engine.load.ChunkData;
-import org.apache.iotdb.db.engine.load.DeletionData;
-import org.apache.iotdb.db.engine.load.TsFileData;
 import org.apache.iotdb.db.engine.modification.Deletion;
 import org.apache.iotdb.db.engine.modification.Modification;
 import org.apache.iotdb.db.engine.modification.ModificationFile;
-import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
-import org.apache.iotdb.db.mpp.plan.analyze.Analysis;
-import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
-import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
-import org.apache.iotdb.db.mpp.plan.planner.plan.node.WritePlanNode;
 import org.apache.iotdb.db.utils.TimePartitionUtils;
 import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
 import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
 import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
 import org.apache.iotdb.tsfile.encoding.decoder.Decoder;
-import org.apache.iotdb.tsfile.exception.NotImplementedException;
 import org.apache.iotdb.tsfile.exception.TsFileRuntimeException;
 import org.apache.iotdb.tsfile.file.MetaMarker;
 import org.apache.iotdb.tsfile.file.header.ChunkGroupHeader;
@@ -56,15 +41,16 @@ import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
 import org.apache.iotdb.tsfile.read.common.BatchData;
 import org.apache.iotdb.tsfile.read.reader.page.PageReader;
 import org.apache.iotdb.tsfile.read.reader.page.TimePageReader;
+import org.apache.iotdb.tsfile.read.reader.page.ValuePageReader;
+import org.apache.iotdb.tsfile.utils.Pair;
+import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.DataOutputStream;
 import java.io.File;
 import java.io.IOException;
 import java.nio.ByteBuffer;
-import java.nio.file.Files;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -72,147 +58,20 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.TreeMap;
-import java.util.stream.Collectors;
+import java.util.function.Function;
 
-public class LoadSingleTsFileNode extends WritePlanNode {
-  private static final Logger logger = 
LoggerFactory.getLogger(LoadSingleTsFileNode.class);
+public class TsFileSplitter {
+  private static final Logger logger = 
LoggerFactory.getLogger(TsFileSplitter.class);
 
-  private File tsFile;
-  private boolean needDecodeTsFile;
+  private final File tsFile;
+  private final Function<TsFileData, Boolean> consumer;
 
-  private Map<TRegionReplicaSet, List<LoadTsFilePieceNode>> replicaSet2Pieces;
-
-  private TsFileResource resource;
-  private TRegionReplicaSet localRegionReplicaSet;
-
-  private boolean deleteAfterLoad;
-
-  public LoadSingleTsFileNode(PlanNodeId id) {
-    super(id);
-  }
-
-  public LoadSingleTsFileNode(PlanNodeId id, TsFileResource resource, boolean 
deleteAfterLoad) {
-    super(id);
-    this.tsFile = resource.getTsFile();
-    this.resource = resource;
-    this.deleteAfterLoad = deleteAfterLoad;
-  }
-
-  public void checkIfNeedDecodeTsFile(DataPartition dataPartition) throws 
IOException {
-    Set<TRegionReplicaSet> allRegionReplicaSet = new HashSet<>();
-    needDecodeTsFile = false;
-    for (String device : resource.getDevices()) {
-      if 
(!TimePartitionUtils.getTimePartitionForRouting(resource.getStartTime(device))
-          
.equals(TimePartitionUtils.getTimePartitionForRouting(resource.getEndTime(device))))
 {
-        needDecodeTsFile = true;
-        return;
-      }
-      
allRegionReplicaSet.addAll(dataPartition.getAllDataRegionReplicaSetForOneDevice(device));
-    }
-    needDecodeTsFile = !isDispatchedToLocal(allRegionReplicaSet);
-    if (!needDecodeTsFile && !resource.resourceFileExists()) {
-      resource.serialize();
-    }
-  }
-
-  private boolean isDispatchedToLocal(Set<TRegionReplicaSet> replicaSets) {
-    if (replicaSets.size() > 1) {
-      return false;
-    }
-    for (TRegionReplicaSet replicaSet : replicaSets) {
-      List<TDataNodeLocation> dataNodeLocationList = 
replicaSet.getDataNodeLocations();
-      if (dataNodeLocationList.size() > 1) {
-        return false;
-      }
-      localRegionReplicaSet = replicaSet;
-      return 
isDispatchedToLocal(dataNodeLocationList.get(0).getInternalEndPoint());
-    }
-    return true;
-  }
-
-  private boolean isDispatchedToLocal(TEndPoint endPoint) {
-    return 
IoTDBDescriptor.getInstance().getConfig().getInternalAddress().equals(endPoint.getIp())
-        && IoTDBDescriptor.getInstance().getConfig().getInternalPort() == 
endPoint.port;
-  }
-
-  public boolean needDecodeTsFile() {
-    return needDecodeTsFile;
-  }
-
-  public boolean isDeleteAfterLoad() {
-    return deleteAfterLoad;
-  }
-
-  /**
-   * only used for load locally.
-   *
-   * @return local TRegionReplicaSet
-   */
-  public TRegionReplicaSet getLocalRegionReplicaSet() {
-    return localRegionReplicaSet;
-  }
-
-  public TsFileResource getTsFileResource() {
-    return resource;
-  }
-
-  public Map<TRegionReplicaSet, List<LoadTsFilePieceNode>> 
getReplicaSet2Pieces() {
-    return replicaSet2Pieces;
-  }
-
-  @Override
-  public TRegionReplicaSet getRegionReplicaSet() {
-    return null;
-  }
-
-  @Override
-  public List<PlanNode> getChildren() {
-    return null;
-  }
-
-  @Override
-  public void addChild(PlanNode child) {}
-
-  @Override
-  public PlanNode clone() {
-    throw new NotImplementedException("clone of load single TsFile is not 
implemented");
-  }
-
-  @Override
-  public int allowedChildCount() {
-    return NO_CHILD_ALLOWED;
-  }
-
-  @Override
-  public List<String> getOutputColumnNames() {
-    return null;
-  }
-
-  @Override
-  protected void serializeAttributes(ByteBuffer byteBuffer) {}
-
-  @Override
-  protected void serializeAttributes(DataOutputStream stream) throws 
IOException {}
-
-  @Override
-  public List<WritePlanNode> splitByPartition(Analysis analysis) {
-    throw new NotImplementedException("split load single TsFile is not 
implemented");
+  public TsFileSplitter(File tsFile, Function<TsFileData, Boolean> consumer) {
+    this.tsFile = tsFile;
+    this.consumer = consumer;
   }
 
-  @Override
-  public String toString() {
-    return "LoadSingleTsFileNode{"
-        + "tsFile="
-        + tsFile
-        + ", needDecodeTsFile="
-        + needDecodeTsFile
-        + '}';
-  }
-
-  public void splitTsFileByDataPartition(DataPartition dataPartition) throws 
IOException {
-    replicaSet2Pieces = new HashMap<>();
-    List<TsFileData> tsFileDataList = new ArrayList<>();
-
+  public void splitTsFileByDataPartition() throws IOException, 
IllegalStateException {
     try (TsFileSequenceReader reader = new 
TsFileSequenceReader(tsFile.getAbsolutePath())) {
       TreeMap<Long, List<Deletion>> offset2Deletions = new TreeMap<>();
       getAllModification(offset2Deletions);
@@ -225,7 +84,8 @@ public class LoadSingleTsFileNode extends WritePlanNode {
       reader.position((long) TSFileConfig.MAGIC_STRING.getBytes().length + 1);
       String curDevice = null;
       boolean isTimeChunkNeedDecode = true;
-      Map<Integer, List<AlignedChunkData>> pageIndex2ChunkData = null;
+      Map<Integer, List<AlignedChunkData>> pageIndex2ChunkData = new 
HashMap<>();
+      Map<Integer, long[]> pageIndex2Times = null;
       Map<Long, IChunkMetadata> offset2ChunkMetadata = new HashMap<>();
       getChunkMetadata(reader, offset2ChunkMetadata);
       byte marker;
@@ -236,12 +96,16 @@ public class LoadSingleTsFileNode extends WritePlanNode {
           case MetaMarker.ONLY_ONE_PAGE_CHUNK_HEADER:
           case MetaMarker.ONLY_ONE_PAGE_TIME_CHUNK_HEADER:
             long chunkOffset = reader.position();
-            handleModification(offset2Deletions, tsFileDataList, chunkOffset);
+            consumeAllAlignedChunkData(chunkOffset, pageIndex2ChunkData);
+            handleModification(offset2Deletions, chunkOffset);
 
             ChunkHeader header = reader.readChunkHeader(marker);
+            String measurementId = header.getMeasurementID();
             if (header.getDataSize() == 0) {
               throw new TsFileRuntimeException(
-                  String.format("Chunk data error when parsing TsFile %s.", 
tsFile.getPath()));
+                  String.format(
+                      "Empty Nonaligned Chunk or Time Chunk with offset %d in 
TsFile %s.",
+                      chunkOffset, tsFile.getPath()));
             }
 
             boolean isAligned =
@@ -251,26 +115,21 @@ public class LoadSingleTsFileNode extends WritePlanNode {
             TTimePartitionSlot timePartitionSlot =
                 
TimePartitionUtils.getTimePartitionForRouting(chunkMetadata.getStartTime());
             ChunkData chunkData =
-                ChunkData.createChunkData(isAligned, reader.position(), 
curDevice, header);
-            chunkData.setTimePartitionSlot(timePartitionSlot);
+                ChunkData.createChunkData(isAligned, curDevice, header, 
timePartitionSlot);
+
             if (!needDecodeChunk(chunkMetadata)) {
+              chunkData.setNotDecode();
+              chunkData.writeEntireChunk(reader.readChunk(-1, 
header.getDataSize()), chunkMetadata);
               if (isAligned) {
                 isTimeChunkNeedDecode = false;
-                pageIndex2ChunkData = new HashMap<>();
                 pageIndex2ChunkData
                     .computeIfAbsent(1, o -> new ArrayList<>())
                     .add((AlignedChunkData) chunkData);
+              } else {
+                consumeChunkData(measurementId, chunkOffset, chunkData);
               }
-              chunkData.setNotDecode(chunkMetadata);
-              chunkData.addDataSize(header.getDataSize());
-              tsFileDataList.add(chunkData);
-              reader.position(reader.position() + header.getDataSize());
               break;
             }
-            if (isAligned) {
-              isTimeChunkNeedDecode = true;
-              pageIndex2ChunkData = new HashMap<>();
-            }
 
             Decoder defaultTimeDecoder =
                 Decoder.getDecoderByType(
@@ -280,8 +139,12 @@ public class LoadSingleTsFileNode extends WritePlanNode {
                 Decoder.getDecoderByType(header.getEncodingType(), 
header.getDataType());
             int dataSize = header.getDataSize();
             int pageIndex = 0;
+            if (isAligned) {
+              isTimeChunkNeedDecode = true;
+              pageIndex2Times = new HashMap<>();
+            }
+
             while (dataSize > 0) {
-              long pageOffset = reader.position();
               PageHeader pageHeader =
                   reader.readPageHeader(
                       header.getDataType(),
@@ -295,50 +158,56 @@ public class LoadSingleTsFileNode extends WritePlanNode {
                 TTimePartitionSlot pageTimePartitionSlot =
                     TimePartitionUtils.getTimePartitionForRouting(startTime);
                 if (!timePartitionSlot.equals(pageTimePartitionSlot)) {
-                  tsFileDataList.add(chunkData);
+                  if (!isAligned) {
+                    consumeChunkData(measurementId, chunkOffset, chunkData);
+                  }
                   timePartitionSlot = pageTimePartitionSlot;
-                  chunkData = ChunkData.createChunkData(isAligned, pageOffset, 
curDevice, header);
-                  chunkData.setTimePartitionSlot(timePartitionSlot);
+                  chunkData =
+                      ChunkData.createChunkData(isAligned, curDevice, header, 
timePartitionSlot);
                 }
                 if (isAligned) {
                   pageIndex2ChunkData
                       .computeIfAbsent(pageIndex, o -> new ArrayList<>())
                       .add((AlignedChunkData) chunkData);
                 }
-                chunkData.addDataSize(pageDataSize);
-                reader.position(pageOffset + pageDataSize);
+                chunkData.writeEntirePage(pageHeader, 
reader.readCompressedPage(pageHeader));
               } else { // split page
                 ByteBuffer pageData = reader.readPage(pageHeader, 
header.getCompressionType());
-                long[] timeBatch =
+                Pair<long[], Object[]> tvArray =
                     decodePage(
                         isAligned, pageData, pageHeader, defaultTimeDecoder, 
valueDecoder, header);
-                boolean isFirstData = true;
-                for (long currentTime : timeBatch) {
-                  TTimePartitionSlot currentTimePartitionSlot =
-                      TimePartitionUtils.getTimePartitionForRouting(
-                          currentTime); // TODO: can speed up
-                  if (!timePartitionSlot.equals(currentTimePartitionSlot)) {
-                    if (!isFirstData) {
-                      chunkData.setTailPageNeedDecode(true); // close last 
chunk data
-                      chunkData.addDataSize(pageDataSize);
-                      if (isAligned) {
-                        pageIndex2ChunkData
-                            .computeIfAbsent(pageIndex, o -> new ArrayList<>())
-                            .add((AlignedChunkData) chunkData);
-                      }
+                long[] times = tvArray.left;
+                Object[] values = tvArray.right;
+                if (isAligned) {
+                  pageIndex2Times.put(pageIndex, times);
+                }
+
+                int satisfiedLength = 0;
+                long endTime =
+                    timePartitionSlot.getStartTime()
+                        + 
TimePartitionUtils.getTimePartitionIntervalForRouting();
+                for (int i = 0; i < times.length; i++) {
+                  if (times[i] >= endTime) {
+                    chunkData.writeDecodePage(times, values, satisfiedLength);
+                    if (isAligned) {
+                      pageIndex2ChunkData
+                          .computeIfAbsent(pageIndex, o -> new ArrayList<>())
+                          .add((AlignedChunkData) chunkData);
+                    } else {
+                      consumeChunkData(measurementId, chunkOffset, chunkData);
                     }
-                    tsFileDataList.add(chunkData);
 
+                    timePartitionSlot = 
TimePartitionUtils.getTimePartitionForRouting(times[i]);
+                    satisfiedLength = 0;
+                    endTime =
+                        timePartitionSlot.getStartTime()
+                            + 
TimePartitionUtils.getTimePartitionIntervalForRouting();
                     chunkData =
-                        ChunkData.createChunkData(
-                            isAligned, pageOffset, curDevice, header); // open 
a new chunk data
-                    chunkData.setTimePartitionSlot(currentTimePartitionSlot);
-                    chunkData.setHeadPageNeedDecode(true);
-                    timePartitionSlot = currentTimePartitionSlot;
+                        ChunkData.createChunkData(isAligned, curDevice, 
header, timePartitionSlot);
                   }
-                  isFirstData = false;
+                  satisfiedLength += 1;
                 }
-                chunkData.addDataSize(pageDataSize);
+                chunkData.writeDecodePage(times, values, satisfiedLength);
                 if (isAligned) {
                   pageIndex2ChunkData
                       .computeIfAbsent(pageIndex, o -> new ArrayList<>())
@@ -350,7 +219,9 @@ public class LoadSingleTsFileNode extends WritePlanNode {
               dataSize -= pageDataSize;
             }
 
-            tsFileDataList.add(chunkData);
+            if (!isAligned) {
+              consumeChunkData(measurementId, chunkOffset, chunkData);
+            }
             break;
           case MetaMarker.VALUE_CHUNK_HEADER:
           case MetaMarker.ONLY_ONE_PAGE_VALUE_CHUNK_HEADER:
@@ -358,37 +229,48 @@ public class LoadSingleTsFileNode extends WritePlanNode {
             chunkMetadata = offset2ChunkMetadata.get(chunkOffset - Byte.BYTES);
             header = reader.readChunkHeader(marker);
             if (header.getDataSize() == 0) {
-              handleEmptyValueChunk(chunkOffset, header, chunkMetadata, 
pageIndex2ChunkData);
+              handleEmptyValueChunk(header, pageIndex2ChunkData);
               break;
             }
 
-            Set<ChunkData> allChunkData = new HashSet<>();
             if (!isTimeChunkNeedDecode) {
               AlignedChunkData alignedChunkData = 
pageIndex2ChunkData.get(1).get(0);
-              alignedChunkData.addValueChunk(chunkOffset, header, 
chunkMetadata);
-              alignedChunkData.addValueChunkDataSize(header.getDataSize());
-              reader.position(reader.position() + header.getDataSize());
+              alignedChunkData.addValueChunk(header);
+              alignedChunkData.writeEntireChunk(
+                  reader.readChunk(-1, header.getDataSize()), chunkMetadata);
               break;
             }
 
+            Set<ChunkData> allChunkData = new HashSet<>();
             dataSize = header.getDataSize();
             pageIndex = 0;
+            valueDecoder = Decoder.getDecoderByType(header.getEncodingType(), 
header.getDataType());
 
             while (dataSize > 0) {
-              long pageOffset = reader.position();
               PageHeader pageHeader =
                   reader.readPageHeader(
                       header.getDataType(),
                       (header.getChunkType() & 0x3F) == 
MetaMarker.CHUNK_HEADER);
               long pageDataSize = pageHeader.getSerializedPageSize();
-              for (AlignedChunkData alignedChunkData : 
pageIndex2ChunkData.get(pageIndex)) {
+              List<AlignedChunkData> alignedChunkDataList = 
pageIndex2ChunkData.get(pageIndex);
+              for (AlignedChunkData alignedChunkData : alignedChunkDataList) {
                 if (!allChunkData.contains(alignedChunkData)) {
-                  alignedChunkData.addValueChunk(pageOffset, header, 
chunkMetadata);
+                  alignedChunkData.addValueChunk(header);
                   allChunkData.add(alignedChunkData);
                 }
-                alignedChunkData.addValueChunkDataSize(pageDataSize);
               }
-              reader.position(pageOffset + pageDataSize);
+              if (alignedChunkDataList.size() == 1) { // write entire page
+                alignedChunkDataList
+                    .get(0)
+                    .writeEntirePage(pageHeader, 
reader.readCompressedPage(pageHeader));
+              } else { // decode page
+                long[] times = pageIndex2Times.get(pageIndex);
+                TsPrimitiveType[] values =
+                    decodeValuePage(reader, header, pageHeader, times, 
valueDecoder);
+                for (AlignedChunkData alignedChunkData : alignedChunkDataList) 
{
+                  alignedChunkData.writeDecodeValuePage(times, values, 
header.getDataType());
+                }
+              }
 
               pageIndex += 1;
               dataSize -= pageDataSize;
@@ -406,27 +288,9 @@ public class LoadSingleTsFileNode extends WritePlanNode {
         }
       }
 
-      handleModification(offset2Deletions, tsFileDataList, Long.MAX_VALUE);
+      consumeAllAlignedChunkData(reader.position(), pageIndex2ChunkData);
+      handleModification(offset2Deletions, Long.MAX_VALUE);
     }
-
-    for (TsFileData tsFileData : tsFileDataList) {
-      if (!tsFileData.isModification()) {
-        ChunkData chunkData = (ChunkData) tsFileData;
-        getPieceNode(chunkData.getDevice(), chunkData.getTimePartitionSlot(), 
dataPartition)
-            .addTsFileData(chunkData);
-      } else {
-        for (Map.Entry<TRegionReplicaSet, List<LoadTsFilePieceNode>> entry :
-            replicaSet2Pieces.entrySet()) {
-          LoadTsFilePieceNode pieceNode = 
entry.getValue().get(entry.getValue().size() - 1);
-          pieceNode.addTsFileData(tsFileData);
-        }
-      }
-    }
-
-    logger.info(
-        String.format(
-            "Finish Parsing TsFile %s, split to %d pieces, send to %d 
RegionReplicaSet.",
-            tsFile.getPath(), tsFileDataList.size(), 
replicaSet2Pieces.keySet().size()));
   }
 
   private void getAllModification(Map<Long, List<Deletion>> offset2Deletions) 
throws IOException {
@@ -474,14 +338,42 @@ public class LoadSingleTsFileNode extends WritePlanNode {
   }
 
   private void handleModification(
-      TreeMap<Long, List<Deletion>> offset2Deletions,
-      List<TsFileData> tsFileDataList,
-      long chunkOffset) {
+      TreeMap<Long, List<Deletion>> offset2Deletions, long chunkOffset) {
     while (!offset2Deletions.isEmpty() && 
offset2Deletions.firstEntry().getKey() <= chunkOffset) {
-      tsFileDataList.addAll(
-          offset2Deletions.pollFirstEntry().getValue().stream()
-              .map(DeletionData::new)
-              .collect(Collectors.toList()));
+      offset2Deletions
+          .pollFirstEntry()
+          .getValue()
+          .forEach(o -> consumer.apply(new DeletionData(o)));
+    }
+  }
+
+  private void consumeAllAlignedChunkData(
+      long offset, Map<Integer, List<AlignedChunkData>> pageIndex2ChunkData) {
+    if (pageIndex2ChunkData.isEmpty()) {
+      return;
+    }
+
+    Set<ChunkData> allChunkData = new HashSet<>();
+    for (Map.Entry<Integer, List<AlignedChunkData>> entry : 
pageIndex2ChunkData.entrySet()) {
+      allChunkData.addAll(entry.getValue());
+    }
+    for (ChunkData chunkData : allChunkData) {
+      if (!consumer.apply(chunkData)) {
+        throw new IllegalStateException(
+            String.format(
+                "Consume aligned chunk data error, next chunk offset: %d, 
chunkData: %s",
+                offset, chunkData));
+      }
+    }
+    pageIndex2ChunkData.clear();
+  }
+
+  private void consumeChunkData(String measurement, long offset, ChunkData 
chunkData) {
+    if (!consumer.apply(chunkData)) {
+      throw new IllegalStateException(
+          String.format(
+              "Consume chunkData error, chunk offset: %d, measurement: %s, 
chunkData: %s",
+              offset, measurement, chunkData));
     }
   }
 
@@ -499,7 +391,7 @@ public class LoadSingleTsFileNode extends WritePlanNode {
         
.equals(TimePartitionUtils.getTimePartitionForRouting(pageHeader.getEndTime()));
   }
 
-  private long[] decodePage(
+  private Pair<long[], Object[]> decodePage(
       boolean isAligned,
       ByteBuffer pageData,
       PageHeader pageHeader,
@@ -509,57 +401,54 @@ public class LoadSingleTsFileNode extends WritePlanNode {
       throws IOException {
     if (isAligned) {
       TimePageReader timePageReader = new TimePageReader(pageHeader, pageData, 
timeDecoder);
-      return timePageReader.getNextTimeBatch();
+      long[] times = timePageReader.getNextTimeBatch();
+      return new Pair<>(times, new Object[times.length]);
     }
 
     valueDecoder.reset();
     PageReader pageReader =
         new PageReader(pageData, chunkHeader.getDataType(), valueDecoder, 
timeDecoder, null);
     BatchData batchData = pageReader.getAllSatisfiedPageData();
-    long[] timeBatch = new long[batchData.length()];
+    long[] times = new long[batchData.length()];
+    Object[] values = new Object[batchData.length()];
     int index = 0;
     while (batchData.hasCurrent()) {
-      timeBatch[index++] = batchData.currentTime();
+      times[index] = batchData.currentTime();
+      values[index++] = batchData.currentValue();
       batchData.next();
     }
-    return timeBatch;
+    return new Pair<>(times, values);
   }
 
   private void handleEmptyValueChunk(
-      long chunkOffset,
-      ChunkHeader header,
-      IChunkMetadata chunkMetadata,
-      Map<Integer, List<AlignedChunkData>> pageIndex2ChunkData) {
+      ChunkHeader header, Map<Integer, List<AlignedChunkData>> 
pageIndex2ChunkData) {
     Set<ChunkData> allChunkData = new HashSet<>();
     for (Map.Entry<Integer, List<AlignedChunkData>> entry : 
pageIndex2ChunkData.entrySet()) {
       for (AlignedChunkData alignedChunkData : entry.getValue()) {
         if (!allChunkData.contains(alignedChunkData)) {
-          alignedChunkData.addValueChunk(chunkOffset, header, chunkMetadata);
+          alignedChunkData.addValueChunk(header);
           allChunkData.add(alignedChunkData);
         }
       }
     }
   }
 
-  private LoadTsFilePieceNode getPieceNode(
-      String device, TTimePartitionSlot timePartitionSlot, DataPartition 
dataPartition) {
-    TRegionReplicaSet replicaSet =
-        dataPartition.getDataRegionReplicaSetForWriting(device, 
timePartitionSlot);
-    List<LoadTsFilePieceNode> pieceNodes =
-        replicaSet2Pieces.computeIfAbsent(replicaSet, o -> new ArrayList<>());
-    if (pieceNodes.isEmpty() || pieceNodes.get(pieceNodes.size() - 
1).exceedSize()) {
-      pieceNodes.add(new LoadTsFilePieceNode(getPlanNodeId(), tsFile));
+  private TsPrimitiveType[] decodeValuePage(
+      TsFileSequenceReader reader,
+      ChunkHeader chunkHeader,
+      PageHeader pageHeader,
+      long[] times,
+      Decoder valueDecoder)
+      throws IOException {
+    if (pageHeader.getSerializedPageSize() == 0) {
+      return new TsPrimitiveType[times.length];
     }
-    return pieceNodes.get(pieceNodes.size() - 1);
-  }
 
-  public void clean() {
-    try {
-      if (deleteAfterLoad) {
-        Files.deleteIfExists(tsFile.toPath());
-      }
-    } catch (IOException e) {
-      logger.warn(String.format("Delete After Loading %s error.", tsFile), e);
-    }
+    valueDecoder.reset();
+    ByteBuffer pageData = reader.readPage(pageHeader, 
chunkHeader.getCompressionType());
+    ValuePageReader valuePageReader =
+        new ValuePageReader(pageHeader, pageData, chunkHeader.getDataType(), 
valueDecoder);
+    return valuePageReader.nextValueBatch(
+        times); // should be origin time, so recording satisfied length is 
necessary
   }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java
index cf6acaa2f9..b6e2c737a9 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java
@@ -110,8 +110,6 @@ import org.apache.iotdb.db.utils.FileLoaderUtils;
 import org.apache.iotdb.db.utils.TimePartitionUtils;
 import org.apache.iotdb.rpc.TSStatusCode;
 import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
-import org.apache.iotdb.tsfile.file.header.ChunkHeader;
-import org.apache.iotdb.tsfile.file.metadata.IChunkMetadata;
 import org.apache.iotdb.tsfile.file.metadata.TimeseriesMetadata;
 import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
@@ -1640,14 +1638,14 @@ public class AnalyzeVisitor extends 
StatementVisitor<Analysis, MPPQueryContext>
           for (TimeseriesMetadata timeseriesMetadata : timeseriesMetadataList) 
{
             TSDataType dataType = timeseriesMetadata.getTSDataType();
             if (!dataType.equals(TSDataType.VECTOR)) {
-              ChunkHeader chunkHeader =
-                  getChunkHeaderByTimeseriesMetadata(reader, 
timeseriesMetadata);
+              Pair<CompressionType, TSEncoding> pair =
+                  
reader.readTimeseriesCompressionTypeAndEncoding(timeseriesMetadata);
               MeasurementSchema measurementSchema =
                   new MeasurementSchema(
                       timeseriesMetadata.getMeasurementId(),
                       dataType,
-                      chunkHeader.getEncodingType(),
-                      chunkHeader.getCompressionType());
+                      pair.getRight(),
+                      pair.getLeft());
               device2Schemas
                   .computeIfAbsent(device, o -> new HashMap<>())
                   .put(measurementSchema, tsFile);
@@ -1697,13 +1695,6 @@ public class AnalyzeVisitor extends 
StatementVisitor<Analysis, MPPQueryContext>
     }
   }
 
-  private ChunkHeader getChunkHeaderByTimeseriesMetadata(
-      TsFileSequenceReader reader, TimeseriesMetadata timeseriesMetadata) 
throws IOException {
-    IChunkMetadata chunkMetadata = 
timeseriesMetadata.getChunkMetadataList().get(0);
-    reader.position(chunkMetadata.getOffsetOfChunkHeader());
-    return reader.readChunkHeader(reader.readMarker());
-  }
-
   private void autoCreateSg(int sgLevel, Map<String, Map<MeasurementSchema, 
File>> device2Schemas)
       throws VerifyMetadataException, LoadFileException, IllegalPathException {
     sgLevel += 1; // e.g. "root.sg" means sgLevel = 1, "root.sg.test" means 
sgLevel=2
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/load/LoadSingleTsFileNode.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/load/LoadSingleTsFileNode.java
index 03bd5f3bae..04da130949 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/load/LoadSingleTsFileNode.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/load/LoadSingleTsFileNode.java
@@ -25,12 +25,6 @@ import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
 import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
 import org.apache.iotdb.commons.partition.DataPartition;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
-import org.apache.iotdb.db.engine.load.AlignedChunkData;
-import org.apache.iotdb.db.engine.load.ChunkData;
-import org.apache.iotdb.db.engine.load.DeletionData;
-import org.apache.iotdb.db.engine.load.TsFileData;
-import org.apache.iotdb.db.engine.modification.Deletion;
-import org.apache.iotdb.db.engine.modification.Modification;
 import org.apache.iotdb.db.engine.modification.ModificationFile;
 import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
 import org.apache.iotdb.db.mpp.plan.analyze.Analysis;
@@ -38,24 +32,7 @@ import 
org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.WritePlanNode;
 import org.apache.iotdb.db.utils.TimePartitionUtils;
-import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
-import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
-import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
-import org.apache.iotdb.tsfile.encoding.decoder.Decoder;
 import org.apache.iotdb.tsfile.exception.NotImplementedException;
-import org.apache.iotdb.tsfile.exception.TsFileRuntimeException;
-import org.apache.iotdb.tsfile.file.MetaMarker;
-import org.apache.iotdb.tsfile.file.header.ChunkGroupHeader;
-import org.apache.iotdb.tsfile.file.header.ChunkHeader;
-import org.apache.iotdb.tsfile.file.header.PageHeader;
-import org.apache.iotdb.tsfile.file.metadata.IChunkMetadata;
-import org.apache.iotdb.tsfile.file.metadata.TimeseriesMetadata;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
-import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
-import org.apache.iotdb.tsfile.read.common.BatchData;
-import org.apache.iotdb.tsfile.read.reader.page.PageReader;
-import org.apache.iotdb.tsfile.read.reader.page.TimePageReader;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -65,49 +42,55 @@ import java.io.File;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.file.Files;
-import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
-import java.util.Map;
 import java.util.Set;
-import java.util.TreeMap;
-import java.util.stream.Collectors;
 
 public class LoadSingleTsFileNode extends WritePlanNode {
   private static final Logger logger = 
LoggerFactory.getLogger(LoadSingleTsFileNode.class);
 
   private File tsFile;
+  private TsFileResource resource;
   private boolean needDecodeTsFile;
+  private boolean deleteAfterLoad;
 
-  private Map<TRegionReplicaSet, List<LoadTsFilePieceNode>> replicaSet2Pieces;
-
-  private TsFileResource resource;
   private TRegionReplicaSet localRegionReplicaSet;
-
-  private boolean deleteAfterLoad;
+  private DataPartition dataPartition;
 
   public LoadSingleTsFileNode(PlanNodeId id) {
     super(id);
   }
 
-  public LoadSingleTsFileNode(PlanNodeId id, TsFileResource resource, boolean 
deleteAfterLoad) {
+  public LoadSingleTsFileNode(
+      PlanNodeId id,
+      TsFileResource resource,
+      boolean deleteAfterLoad,
+      DataPartition dataPartition) {
     super(id);
     this.tsFile = resource.getTsFile();
     this.resource = resource;
     this.deleteAfterLoad = deleteAfterLoad;
+    this.dataPartition = dataPartition;
   }
 
-  public void checkIfNeedDecodeTsFile(DataPartition dataPartition) throws 
IOException {
+  public void checkIfNeedDecodeTsFile() throws IOException {
     Set<TRegionReplicaSet> allRegionReplicaSet = new HashSet<>();
+    TTimePartitionSlot timePartitionSlot = null;
     needDecodeTsFile = false;
     for (String device : resource.getDevices()) {
-      if 
(!TimePartitionUtils.getTimePartitionForRouting(resource.getStartTime(device))
-          
.equals(TimePartitionUtils.getTimePartitionForRouting(resource.getEndTime(device))))
 {
+      TTimePartitionSlot startSlot =
+          
TimePartitionUtils.getTimePartitionForRouting(resource.getStartTime(device));
+      if (timePartitionSlot == null) {
+        timePartitionSlot = startSlot;
+      }
+      if (!startSlot.equals(timePartitionSlot)
+          || 
!TimePartitionUtils.getTimePartitionForRouting(resource.getEndTime(device))
+              .equals(timePartitionSlot)) {
         needDecodeTsFile = true;
         return;
       }
-      
allRegionReplicaSet.addAll(dataPartition.getAllDataRegionReplicaSetForOneDevice(device));
+      allRegionReplicaSet.add(
+          dataPartition.getDataRegionReplicaSetForWriting(device, 
timePartitionSlot));
     }
     needDecodeTsFile = !isDispatchedToLocal(allRegionReplicaSet);
     if (!needDecodeTsFile && !resource.resourceFileExists()) {
@@ -152,12 +135,12 @@ public class LoadSingleTsFileNode extends WritePlanNode {
     return localRegionReplicaSet;
   }
 
-  public TsFileResource getTsFileResource() {
-    return resource;
+  public DataPartition getDataPartition() {
+    return dataPartition;
   }
 
-  public Map<TRegionReplicaSet, List<LoadTsFilePieceNode>> 
getReplicaSet2Pieces() {
-    return replicaSet2Pieces;
+  public TsFileResource getTsFileResource() {
+    return resource;
   }
 
   @Override
@@ -209,354 +192,14 @@ public class LoadSingleTsFileNode extends WritePlanNode {
         + '}';
   }
 
-  public void splitTsFileByDataPartition(DataPartition dataPartition) throws 
IOException {
-    replicaSet2Pieces = new HashMap<>();
-    List<TsFileData> tsFileDataList = new ArrayList<>();
-
-    try (TsFileSequenceReader reader = new 
TsFileSequenceReader(tsFile.getAbsolutePath())) {
-      TreeMap<Long, List<Deletion>> offset2Deletions = new TreeMap<>();
-      getAllModification(offset2Deletions);
-
-      if (!checkMagic(reader)) {
-        throw new TsFileRuntimeException(
-            String.format("Magic String check error when parsing TsFile %s.", 
tsFile.getPath()));
-      }
-
-      reader.position((long) TSFileConfig.MAGIC_STRING.getBytes().length + 1);
-      String curDevice = null;
-      boolean isTimeChunkNeedDecode = true;
-      Map<Integer, List<AlignedChunkData>> pageIndex2ChunkData = null;
-      Map<Long, IChunkMetadata> offset2ChunkMetadata = new HashMap<>();
-      getChunkMetadata(reader, offset2ChunkMetadata);
-      byte marker;
-      while ((marker = reader.readMarker()) != MetaMarker.SEPARATOR) {
-        switch (marker) {
-          case MetaMarker.CHUNK_HEADER:
-          case MetaMarker.TIME_CHUNK_HEADER:
-          case MetaMarker.ONLY_ONE_PAGE_CHUNK_HEADER:
-          case MetaMarker.ONLY_ONE_PAGE_TIME_CHUNK_HEADER:
-            long chunkOffset = reader.position();
-            handleModification(offset2Deletions, tsFileDataList, chunkOffset);
-
-            ChunkHeader header = reader.readChunkHeader(marker);
-            if (header.getDataSize() == 0) {
-              throw new TsFileRuntimeException(
-                  String.format("Chunk data error when parsing TsFile %s.", 
tsFile.getPath()));
-            }
-
-            boolean isAligned =
-                ((header.getChunkType() & TsFileConstant.TIME_COLUMN_MASK)
-                    == TsFileConstant.TIME_COLUMN_MASK);
-            IChunkMetadata chunkMetadata = 
offset2ChunkMetadata.get(chunkOffset - Byte.BYTES);
-            TTimePartitionSlot timePartitionSlot =
-                
TimePartitionUtils.getTimePartitionForRouting(chunkMetadata.getStartTime());
-            ChunkData chunkData =
-                ChunkData.createChunkData(isAligned, reader.position(), 
curDevice, header);
-            chunkData.setTimePartitionSlot(timePartitionSlot);
-            if (!needDecodeChunk(chunkMetadata)) {
-              if (isAligned) {
-                isTimeChunkNeedDecode = false;
-                pageIndex2ChunkData = new HashMap<>();
-                pageIndex2ChunkData
-                    .computeIfAbsent(1, o -> new ArrayList<>())
-                    .add((AlignedChunkData) chunkData);
-              }
-              chunkData.setNotDecode(chunkMetadata);
-              chunkData.addDataSize(header.getDataSize());
-              tsFileDataList.add(chunkData);
-              reader.position(reader.position() + header.getDataSize());
-              break;
-            }
-            if (isAligned) {
-              isTimeChunkNeedDecode = true;
-              pageIndex2ChunkData = new HashMap<>();
-            }
-
-            Decoder defaultTimeDecoder =
-                Decoder.getDecoderByType(
-                    
TSEncoding.valueOf(TSFileDescriptor.getInstance().getConfig().getTimeEncoder()),
-                    TSDataType.INT64);
-            Decoder valueDecoder =
-                Decoder.getDecoderByType(header.getEncodingType(), 
header.getDataType());
-            int dataSize = header.getDataSize();
-            int pageIndex = 0;
-            while (dataSize > 0) {
-              long pageOffset = reader.position();
-              PageHeader pageHeader =
-                  reader.readPageHeader(
-                      header.getDataType(),
-                      (header.getChunkType() & 0x3F) == 
MetaMarker.CHUNK_HEADER);
-              long pageDataSize = pageHeader.getSerializedPageSize();
-              if (!needDecodePage(pageHeader, chunkMetadata)) { // an entire 
page
-                long startTime =
-                    pageHeader.getStatistics() == null
-                        ? chunkMetadata.getStartTime()
-                        : pageHeader.getStartTime();
-                TTimePartitionSlot pageTimePartitionSlot =
-                    TimePartitionUtils.getTimePartitionForRouting(startTime);
-                if (!timePartitionSlot.equals(pageTimePartitionSlot)) {
-                  tsFileDataList.add(chunkData);
-                  timePartitionSlot = pageTimePartitionSlot;
-                  chunkData = ChunkData.createChunkData(isAligned, pageOffset, 
curDevice, header);
-                  chunkData.setTimePartitionSlot(timePartitionSlot);
-                }
-                if (isAligned) {
-                  pageIndex2ChunkData
-                      .computeIfAbsent(pageIndex, o -> new ArrayList<>())
-                      .add((AlignedChunkData) chunkData);
-                }
-                chunkData.addDataSize(pageDataSize);
-                reader.position(pageOffset + pageDataSize);
-              } else { // split page
-                ByteBuffer pageData = reader.readPage(pageHeader, 
header.getCompressionType());
-                long[] timeBatch =
-                    decodePage(
-                        isAligned, pageData, pageHeader, defaultTimeDecoder, 
valueDecoder, header);
-                boolean isFirstData = true;
-                for (long currentTime : timeBatch) {
-                  TTimePartitionSlot currentTimePartitionSlot =
-                      TimePartitionUtils.getTimePartitionForRouting(
-                          currentTime); // TODO: can speed up
-                  if (!timePartitionSlot.equals(currentTimePartitionSlot)) {
-                    if (!isFirstData) {
-                      chunkData.setTailPageNeedDecode(true); // close last 
chunk data
-                      chunkData.addDataSize(pageDataSize);
-                      if (isAligned) {
-                        pageIndex2ChunkData
-                            .computeIfAbsent(pageIndex, o -> new ArrayList<>())
-                            .add((AlignedChunkData) chunkData);
-                      }
-                    }
-                    tsFileDataList.add(chunkData);
-
-                    chunkData =
-                        ChunkData.createChunkData(
-                            isAligned, pageOffset, curDevice, header); // open 
a new chunk data
-                    chunkData.setTimePartitionSlot(currentTimePartitionSlot);
-                    chunkData.setHeadPageNeedDecode(true);
-                    timePartitionSlot = currentTimePartitionSlot;
-                  }
-                  isFirstData = false;
-                }
-                chunkData.addDataSize(pageDataSize);
-                if (isAligned) {
-                  pageIndex2ChunkData
-                      .computeIfAbsent(pageIndex, o -> new ArrayList<>())
-                      .add((AlignedChunkData) chunkData);
-                }
-              }
-
-              pageIndex += 1;
-              dataSize -= pageDataSize;
-            }
-
-            tsFileDataList.add(chunkData);
-            break;
-          case MetaMarker.VALUE_CHUNK_HEADER:
-          case MetaMarker.ONLY_ONE_PAGE_VALUE_CHUNK_HEADER:
-            chunkOffset = reader.position();
-            chunkMetadata = offset2ChunkMetadata.get(chunkOffset - Byte.BYTES);
-            header = reader.readChunkHeader(marker);
-            if (header.getDataSize() == 0) {
-              handleEmptyValueChunk(chunkOffset, header, chunkMetadata, 
pageIndex2ChunkData);
-              break;
-            }
-
-            Set<ChunkData> allChunkData = new HashSet<>();
-            if (!isTimeChunkNeedDecode) {
-              AlignedChunkData alignedChunkData = 
pageIndex2ChunkData.get(1).get(0);
-              alignedChunkData.addValueChunk(chunkOffset, header, 
chunkMetadata);
-              alignedChunkData.addValueChunkDataSize(header.getDataSize());
-              reader.position(reader.position() + header.getDataSize());
-              break;
-            }
-
-            dataSize = header.getDataSize();
-            pageIndex = 0;
-
-            while (dataSize > 0) {
-              long pageOffset = reader.position();
-              PageHeader pageHeader =
-                  reader.readPageHeader(
-                      header.getDataType(),
-                      (header.getChunkType() & 0x3F) == 
MetaMarker.CHUNK_HEADER);
-              long pageDataSize = pageHeader.getSerializedPageSize();
-              for (AlignedChunkData alignedChunkData : 
pageIndex2ChunkData.get(pageIndex)) {
-                if (!allChunkData.contains(alignedChunkData)) {
-                  alignedChunkData.addValueChunk(pageOffset, header, 
chunkMetadata);
-                  allChunkData.add(alignedChunkData);
-                }
-                alignedChunkData.addValueChunkDataSize(pageDataSize);
-              }
-              reader.position(pageOffset + pageDataSize);
-
-              pageIndex += 1;
-              dataSize -= pageDataSize;
-            }
-            break;
-          case MetaMarker.CHUNK_GROUP_HEADER:
-            ChunkGroupHeader chunkGroupHeader = reader.readChunkGroupHeader();
-            curDevice = chunkGroupHeader.getDeviceID();
-            break;
-          case MetaMarker.OPERATION_INDEX_RANGE:
-            reader.readPlanIndex();
-            break;
-          default:
-            MetaMarker.handleUnexpectedMarker(marker);
-        }
-      }
-
-      handleModification(offset2Deletions, tsFileDataList, Long.MAX_VALUE);
-    }
-
-    for (TsFileData tsFileData : tsFileDataList) {
-      if (!tsFileData.isModification()) {
-        ChunkData chunkData = (ChunkData) tsFileData;
-        getPieceNode(chunkData.getDevice(), chunkData.getTimePartitionSlot(), 
dataPartition)
-            .addTsFileData(chunkData);
-      } else {
-        for (Map.Entry<TRegionReplicaSet, List<LoadTsFilePieceNode>> entry :
-            replicaSet2Pieces.entrySet()) {
-          LoadTsFilePieceNode pieceNode = 
entry.getValue().get(entry.getValue().size() - 1);
-          pieceNode.addTsFileData(tsFileData);
-        }
-      }
-    }
-
-    logger.info(
-        String.format(
-            "Finish Parsing TsFile %s, split to %d pieces, send to %d 
RegionReplicaSet.",
-            tsFile.getPath(), tsFileDataList.size(), 
replicaSet2Pieces.keySet().size()));
-  }
-
-  private void getAllModification(Map<Long, List<Deletion>> offset2Deletions) 
throws IOException {
-    try (ModificationFile modificationFile =
-        new ModificationFile(tsFile.getAbsolutePath() + 
ModificationFile.FILE_SUFFIX)) {
-      for (Modification modification : modificationFile.getModifications()) {
-        offset2Deletions
-            .computeIfAbsent(modification.getFileOffset(), o -> new 
ArrayList<>())
-            .add((Deletion) modification);
-      }
-    }
-  }
-
-  private boolean checkMagic(TsFileSequenceReader reader) throws IOException {
-    String magic = reader.readHeadMagic();
-    if (!magic.equals(TSFileConfig.MAGIC_STRING)) {
-      logger.error("the file's MAGIC STRING is incorrect, file path: {}", 
reader.getFileName());
-      return false;
-    }
-
-    byte versionNumber = reader.readVersionNumber();
-    if (versionNumber != TSFileConfig.VERSION_NUMBER) {
-      logger.error("the file's Version Number is incorrect, file path: {}", 
reader.getFileName());
-      return false;
-    }
-
-    if (!reader.readTailMagic().equals(TSFileConfig.MAGIC_STRING)) {
-      logger.error("the file is not closed correctly, file path: {}", 
reader.getFileName());
-      return false;
-    }
-    return true;
-  }
-
-  private void getChunkMetadata(
-      TsFileSequenceReader reader, Map<Long, IChunkMetadata> 
offset2ChunkMetadata)
-      throws IOException {
-    Map<String, List<TimeseriesMetadata>> device2Metadata = 
reader.getAllTimeseriesMetadata(true);
-    for (Map.Entry<String, List<TimeseriesMetadata>> entry : 
device2Metadata.entrySet()) {
-      for (TimeseriesMetadata timeseriesMetadata : entry.getValue()) {
-        for (IChunkMetadata chunkMetadata : 
timeseriesMetadata.getChunkMetadataList()) {
-          offset2ChunkMetadata.put(chunkMetadata.getOffsetOfChunkHeader(), 
chunkMetadata);
-        }
-      }
-    }
-  }
-
-  private void handleModification(
-      TreeMap<Long, List<Deletion>> offset2Deletions,
-      List<TsFileData> tsFileDataList,
-      long chunkOffset) {
-    while (!offset2Deletions.isEmpty() && 
offset2Deletions.firstEntry().getKey() <= chunkOffset) {
-      tsFileDataList.addAll(
-          offset2Deletions.pollFirstEntry().getValue().stream()
-              .map(DeletionData::new)
-              .collect(Collectors.toList()));
-    }
-  }
-
-  private boolean needDecodeChunk(IChunkMetadata chunkMetadata) {
-    return 
!TimePartitionUtils.getTimePartitionForRouting(chunkMetadata.getStartTime())
-        
.equals(TimePartitionUtils.getTimePartitionForRouting(chunkMetadata.getEndTime()));
-  }
-
-  private boolean needDecodePage(PageHeader pageHeader, IChunkMetadata 
chunkMetadata) {
-    if (pageHeader.getStatistics() == null) {
-      return 
!TimePartitionUtils.getTimePartitionForRouting(chunkMetadata.getStartTime())
-          
.equals(TimePartitionUtils.getTimePartitionForRouting(chunkMetadata.getEndTime()));
-    }
-    return 
!TimePartitionUtils.getTimePartitionForRouting(pageHeader.getStartTime())
-        
.equals(TimePartitionUtils.getTimePartitionForRouting(pageHeader.getEndTime()));
-  }
-
-  private long[] decodePage(
-      boolean isAligned,
-      ByteBuffer pageData,
-      PageHeader pageHeader,
-      Decoder timeDecoder,
-      Decoder valueDecoder,
-      ChunkHeader chunkHeader)
-      throws IOException {
-    if (isAligned) {
-      TimePageReader timePageReader = new TimePageReader(pageHeader, pageData, 
timeDecoder);
-      return timePageReader.getNextTimeBatch();
-    }
-
-    valueDecoder.reset();
-    PageReader pageReader =
-        new PageReader(pageData, chunkHeader.getDataType(), valueDecoder, 
timeDecoder, null);
-    BatchData batchData = pageReader.getAllSatisfiedPageData();
-    long[] timeBatch = new long[batchData.length()];
-    int index = 0;
-    while (batchData.hasCurrent()) {
-      timeBatch[index++] = batchData.currentTime();
-      batchData.next();
-    }
-    return timeBatch;
-  }
-
-  private void handleEmptyValueChunk(
-      long chunkOffset,
-      ChunkHeader header,
-      IChunkMetadata chunkMetadata,
-      Map<Integer, List<AlignedChunkData>> pageIndex2ChunkData) {
-    Set<ChunkData> allChunkData = new HashSet<>();
-    for (Map.Entry<Integer, List<AlignedChunkData>> entry : 
pageIndex2ChunkData.entrySet()) {
-      for (AlignedChunkData alignedChunkData : entry.getValue()) {
-        if (!allChunkData.contains(alignedChunkData)) {
-          alignedChunkData.addValueChunk(chunkOffset, header, chunkMetadata);
-          allChunkData.add(alignedChunkData);
-        }
-      }
-    }
-  }
-
-  private LoadTsFilePieceNode getPieceNode(
-      String device, TTimePartitionSlot timePartitionSlot, DataPartition 
dataPartition) {
-    TRegionReplicaSet replicaSet =
-        dataPartition.getDataRegionReplicaSetForWriting(device, 
timePartitionSlot);
-    List<LoadTsFilePieceNode> pieceNodes =
-        replicaSet2Pieces.computeIfAbsent(replicaSet, o -> new ArrayList<>());
-    if (pieceNodes.isEmpty() || pieceNodes.get(pieceNodes.size() - 
1).exceedSize()) {
-      pieceNodes.add(new LoadTsFilePieceNode(getPlanNodeId(), tsFile));
-    }
-    return pieceNodes.get(pieceNodes.size() - 1);
-  }
-
   public void clean() {
     try {
       if (deleteAfterLoad) {
         Files.deleteIfExists(tsFile.toPath());
+        Files.deleteIfExists(
+            new File(tsFile.getAbsolutePath() + 
TsFileResource.RESOURCE_SUFFIX).toPath());
+        Files.deleteIfExists(
+            new File(tsFile.getAbsolutePath() + 
ModificationFile.FILE_SUFFIX).toPath());
       }
     } catch (IOException e) {
       logger.warn(String.format("Delete After Loading %s error.", tsFile), e);
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/load/LoadTsFileNode.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/load/LoadTsFileNode.java
index 0ecb865963..db2a1db1ab 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/load/LoadTsFileNode.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/load/LoadTsFileNode.java
@@ -92,17 +92,18 @@ public class LoadTsFileNode extends WritePlanNode {
     for (TsFileResource resource : resources) {
       try {
         LoadSingleTsFileNode singleTsFileNode =
-            new LoadSingleTsFileNode(getPlanNodeId(), resource, 
statement.isDeleteAfterLoad());
-        
singleTsFileNode.checkIfNeedDecodeTsFile(analysis.getDataPartitionInfo());
-        if (singleTsFileNode.needDecodeTsFile()) {
-          
singleTsFileNode.splitTsFileByDataPartition(analysis.getDataPartitionInfo());
-        } else {
-          logger.info(
-              String.format("TsFile %s will be loaded to local.", 
resource.getTsFile().getPath()));
-        }
+            new LoadSingleTsFileNode(
+                getPlanNodeId(),
+                resource,
+                statement.isDeleteAfterLoad(),
+                analysis.getDataPartitionInfo());
+        singleTsFileNode.checkIfNeedDecodeTsFile();
         res.add(singleTsFileNode);
       } catch (Exception e) {
-        logger.error(String.format("Parse TsFile %s error", 
resource.getTsFile().getPath()), e);
+        logger.error(
+            String.format(
+                "Check whether TsFile %s need decode or not error", 
resource.getTsFile().getPath()),
+            e);
       }
     }
     return res;
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/load/LoadTsFilePieceNode.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/load/LoadTsFilePieceNode.java
index b0021b6d46..73baf01838 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/load/LoadTsFilePieceNode.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/load/LoadTsFilePieceNode.java
@@ -21,8 +21,6 @@ package org.apache.iotdb.db.mpp.plan.planner.plan.node.load;
 
 import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
 import org.apache.iotdb.commons.exception.IllegalPathException;
-import org.apache.iotdb.db.conf.IoTDBConfig;
-import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.engine.load.TsFileData;
 import org.apache.iotdb.db.mpp.plan.analyze.Analysis;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
@@ -48,7 +46,6 @@ import java.util.List;
 
 public class LoadTsFilePieceNode extends WritePlanNode {
   private static final Logger logger = 
LoggerFactory.getLogger(LoadTsFilePieceNode.class);
-  private static final IoTDBConfig config = 
IoTDBDescriptor.getInstance().getConfig();
 
   private File tsFile;
 
@@ -66,9 +63,8 @@ public class LoadTsFilePieceNode extends WritePlanNode {
     this.tsFileDataList = new ArrayList<>();
   }
 
-  public boolean exceedSize() {
-    return dataSize >= config.getThriftMaxFrameSize() / 2
-        || dataSize >= config.getAllocateMemoryForFree() / 2;
+  public long getDataSize() {
+    return dataSize;
   }
 
   public void addTsFileData(TsFileData tsFileData) {
@@ -131,11 +127,12 @@ public class LoadTsFilePieceNode extends WritePlanNode {
     ReadWriteIOUtils.write(tsFileDataList.size(), stream);
     for (TsFileData tsFileData : tsFileDataList) {
       try {
-        tsFileData.serialize(stream, tsFile);
+        tsFileData.serialize(stream);
       } catch (IOException e) {
         logger.error(
             String.format(
-                "Parse page of TsFile %s error, skip chunk %s", 
tsFile.getPath(), tsFileData));
+                "Serialize data of TsFile %s error, skip TsFileData %s",
+                tsFile.getPath(), tsFileData));
       }
     }
   }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/load/LoadTsFileDispatcherImpl.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/load/LoadTsFileDispatcherImpl.java
index a84a914d6c..3471aa3563 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/load/LoadTsFileDispatcherImpl.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/load/LoadTsFileDispatcherImpl.java
@@ -112,7 +112,7 @@ public class LoadTsFileDispatcherImpl implements 
IFragInstanceDispatcher {
       if (isDispatchedToLocal(endPoint)) {
         dispatchLocally(instance);
       } else {
-        dispatchRemote(instance, endPoint); // TODO: can read only once
+        dispatchRemote(instance, endPoint);
       }
     }
   }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/load/LoadTsFileScheduler.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/load/LoadTsFileScheduler.java
index 69be1f1614..2d6be8ad60 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/load/LoadTsFileScheduler.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/load/LoadTsFileScheduler.java
@@ -24,6 +24,11 @@ import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.client.IClientManager;
 import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient;
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.engine.load.ChunkData;
+import org.apache.iotdb.db.engine.load.TsFileData;
+import org.apache.iotdb.db.engine.load.TsFileSplitter;
 import org.apache.iotdb.db.exception.mpp.FragmentInstanceDispatchException;
 import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
 import org.apache.iotdb.db.mpp.common.MPPQueryContext;
@@ -46,6 +51,8 @@ import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -56,6 +63,7 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
 
 /**
  * {@link LoadTsFileScheduler} is used for scheduling {@link 
LoadSingleTsFileNode} and {@link
@@ -65,15 +73,17 @@ import java.util.concurrent.TimeoutException;
  * 
href="https://apache-iotdb.feishu.cn/docx/doxcnyBYWzek8ksSEU6obZMpYLe";>...</a>;
  */
 public class LoadTsFileScheduler implements IScheduler {
-  public static final long LOAD_TASK_MAX_TIME_IN_SECOND = 5184000L; // one day
-
   private static final Logger logger = 
LoggerFactory.getLogger(LoadTsFileScheduler.class);
+  private static final IoTDBConfig config = 
IoTDBDescriptor.getInstance().getConfig();
+  public static final long LOAD_TASK_MAX_TIME_IN_SECOND = 5184000L; // one day
+  private static final long MAX_MEMORY_SIZE =
+      Math.min(config.getThriftMaxFrameSize() / 2, 
config.getAllocateMemoryForStorageEngine() / 8);
 
   private final MPPQueryContext queryContext;
   private final QueryStateMachine stateMachine;
-  private LoadTsFileDispatcherImpl dispatcher;
-  private List<LoadSingleTsFileNode> tsFileNodeList;
-  private PlanFragmentId fragmentId;
+  private final LoadTsFileDispatcherImpl dispatcher;
+  private final List<LoadSingleTsFileNode> tsFileNodeList;
+  private final PlanFragmentId fragmentId;
 
   private Set<TRegionReplicaSet> allReplicaSets;
 
@@ -124,75 +134,82 @@ public class LoadTsFileScheduler implements IScheduler {
   }
 
   private boolean firstPhase(LoadSingleTsFileNode node) {
-    if (!dispatchOneTsFile(node)) {
+    try {
+      TsFileDataManager tsFileDataManager = new TsFileDataManager(this, node);
+      new TsFileSplitter(
+              node.getTsFileResource().getTsFile(), 
tsFileDataManager::addOrSendTsFileData)
+          .splitTsFileByDataPartition();
+      if (!tsFileDataManager.sendAllTsFileData()) {
+        return false;
+      }
+    } catch (IllegalStateException e) {
+      logger.error(
+          String.format(
+              "Dispatch TsFileData error when parsing TsFile %s.",
+              node.getTsFileResource().getTsFile()),
+          e);
+      return false;
+    } catch (Exception e) {
+      stateMachine.transitionToFailed(e);
       logger.error(
-          String.format("Dispatch Single TsFile Node error, 
LoadSingleTsFileNode %s.", node));
+          String.format("Parse TsFile %s error.", 
node.getTsFileResource().getTsFile()), e);
       return false;
     }
     return true;
   }
 
-  private boolean dispatchOneTsFile(LoadSingleTsFileNode node) {
-    logger.info(
-        String.format(
-            "Start dispatching TsFile %s", 
node.getTsFileResource().getTsFile().getPath()));
-    for (Map.Entry<TRegionReplicaSet, List<LoadTsFilePieceNode>> entry :
-        node.getReplicaSet2Pieces().entrySet()) {
-      allReplicaSets.add(entry.getKey());
-      for (LoadTsFilePieceNode pieceNode : entry.getValue()) {
-        FragmentInstance instance =
-            new FragmentInstance(
-                new PlanFragment(fragmentId, pieceNode),
-                fragmentId.genFragmentInstanceId(),
-                null,
-                queryContext.getQueryType(),
-                queryContext.getTimeOut(),
-                queryContext.getSession());
-        instance.setDataRegionAndHost(entry.getKey());
-        Future<FragInstanceDispatchResult> dispatchResultFuture =
-            dispatcher.dispatch(Collections.singletonList(instance));
-
-        try {
-          FragInstanceDispatchResult result =
-              dispatchResultFuture.get(
-                  LoadTsFileScheduler.LOAD_TASK_MAX_TIME_IN_SECOND, 
TimeUnit.SECONDS);
-          if (!result.isSuccessful()) {
-            // TODO: retry.
+  private boolean dispatchOnePieceNode(
+      LoadTsFilePieceNode pieceNode, TRegionReplicaSet replicaSet) {
+    allReplicaSets.add(replicaSet);
+    FragmentInstance instance =
+        new FragmentInstance(
+            new PlanFragment(fragmentId, pieceNode),
+            fragmentId.genFragmentInstanceId(),
+            null,
+            queryContext.getQueryType(),
+            queryContext.getTimeOut(),
+            queryContext.getSession());
+    instance.setDataRegionAndHost(replicaSet);
+    Future<FragInstanceDispatchResult> dispatchResultFuture =
+        dispatcher.dispatch(Collections.singletonList(instance));
+
+    try {
+      FragInstanceDispatchResult result =
+          dispatchResultFuture.get(
+              LoadTsFileScheduler.LOAD_TASK_MAX_TIME_IN_SECOND, 
TimeUnit.SECONDS);
+      if (!result.isSuccessful()) {
+        // TODO: retry.
+        logger.error(
+            String.format(
+                "Dispatch one piece to ReplicaSet %s error, result status code 
%s.",
+                replicaSet, 
TSStatusCode.representOf(result.getFailureStatus().getCode()).name()));
+        logger.error(
+            String.format("Result status message %s.", 
result.getFailureStatus().getMessage()));
+        if (result.getFailureStatus().getSubStatus() != null) {
+          for (TSStatus status : result.getFailureStatus().getSubStatus()) {
             logger.error(
                 String.format(
-                    "Dispatch one piece  to ReplicaSet %s error, result status 
code %s.",
-                    entry.getKey(),
-                    
TSStatusCode.representOf(result.getFailureStatus().getCode()).name()));
-            logger.error(
-                String.format("Result status message %s.", 
result.getFailureStatus().getMessage()));
-            if (result.getFailureStatus().getSubStatus() != null) {
-              for (TSStatus status : result.getFailureStatus().getSubStatus()) 
{
-                logger.error(
-                    String.format(
-                        "Sub status code %s.", 
TSStatusCode.representOf(status.getCode()).name()));
-                logger.error(String.format("Sub status message %s.", 
status.getMessage()));
-              }
-            }
-            logger.error(String.format("Dispatch piece node:%n%s", pieceNode));
-            stateMachine.transitionToFailed(result.getFailureStatus()); // 
TODO: record more status
-            return false;
-          }
-        } catch (InterruptedException | ExecutionException | 
CancellationException e) {
-          if (e instanceof InterruptedException) {
-            Thread.currentThread().interrupt();
+                    "Sub status code %s.", 
TSStatusCode.representOf(status.getCode()).name()));
+            logger.error(String.format("Sub status message %s.", 
status.getMessage()));
           }
-          logger.warn("Interrupt or Execution error.", e);
-          stateMachine.transitionToFailed(e);
-          return false;
-        } catch (TimeoutException e) {
-          dispatchResultFuture.cancel(true);
-          logger.error(
-              String.format("Wait for loading %s time out.", 
LoadTsFilePieceNode.class.getName()),
-              e);
-          stateMachine.transitionToFailed(e);
-          return false;
         }
+        logger.error(String.format("Dispatch piece node error:%n%s", 
pieceNode));
+        stateMachine.transitionToFailed(result.getFailureStatus()); // TODO: 
record more status
+        return false;
       }
+    } catch (InterruptedException | ExecutionException | CancellationException 
e) {
+      if (e instanceof InterruptedException) {
+        Thread.currentThread().interrupt();
+      }
+      logger.warn("Interrupt or Execution error.", e);
+      stateMachine.transitionToFailed(e);
+      return false;
+    } catch (TimeoutException e) {
+      dispatchResultFuture.cancel(true);
+      logger.error(
+          String.format("Wait for loading %s time out.", 
LoadTsFilePieceNode.class.getName()), e);
+      stateMachine.transitionToFailed(e);
+      return false;
     }
     return true;
   }
@@ -276,4 +293,91 @@ public class LoadTsFileScheduler implements IScheduler {
     EXECUTE,
     ROLLBACK
   }
+
+  private class TsFileDataManager {
+    private final LoadTsFileScheduler scheduler;
+    private final LoadSingleTsFileNode singleTsFileNode;
+
+    private long dataSize;
+    private Map<TRegionReplicaSet, LoadTsFilePieceNode> replicaSet2Piece;
+
+    public TsFileDataManager(LoadTsFileScheduler scheduler, 
LoadSingleTsFileNode singleTsFileNode) {
+      this.scheduler = scheduler;
+      this.singleTsFileNode = singleTsFileNode;
+      this.dataSize = 0;
+      this.replicaSet2Piece = new HashMap<>();
+    }
+
+    private boolean addOrSendTsFileData(TsFileData tsFileData) {
+      return tsFileData.isModification()
+          ? addOrSendDeletionData(tsFileData)
+          : addOrSendChunkData((ChunkData) tsFileData);
+    }
+
+    private boolean addOrSendChunkData(ChunkData chunkData) {
+      dataSize += chunkData.getDataSize();
+      if (dataSize > MAX_MEMORY_SIZE) {
+        List<TRegionReplicaSet> sortedReplicaSets =
+            replicaSet2Piece.keySet().stream()
+                .sorted(
+                    Comparator.comparingLong(o -> 
replicaSet2Piece.get(o).getDataSize()).reversed())
+                .collect(Collectors.toList());
+
+        for (TRegionReplicaSet sortedReplicaSet : sortedReplicaSets) {
+          LoadTsFilePieceNode pieceNode = 
replicaSet2Piece.get(sortedReplicaSet);
+          if (pieceNode.getDataSize() == 0) { // total data size has been 
reduced to 0
+            break;
+          }
+          if (!scheduler.dispatchOnePieceNode(pieceNode, sortedReplicaSet)) {
+            return false;
+          }
+
+          dataSize -= pieceNode.getDataSize();
+          replicaSet2Piece.put(
+              sortedReplicaSet,
+              new LoadTsFilePieceNode(
+                  singleTsFileNode.getPlanNodeId(),
+                  singleTsFileNode.getTsFileResource().getTsFile()));
+          if (dataSize <= MAX_MEMORY_SIZE) {
+            break;
+          }
+        }
+      }
+      TRegionReplicaSet replicaSet =
+          singleTsFileNode
+              .getDataPartition()
+              .getDataRegionReplicaSetForWriting(
+                  chunkData.getDevice(), chunkData.getTimePartitionSlot());
+      replicaSet2Piece
+          .computeIfAbsent(
+              replicaSet,
+              o ->
+                  new LoadTsFilePieceNode(
+                      singleTsFileNode.getPlanNodeId(),
+                      singleTsFileNode.getTsFileResource().getTsFile()))
+          .addTsFileData(chunkData);
+      return true;
+    }
+
+    private boolean addOrSendDeletionData(TsFileData deletionData) {
+      for (Map.Entry<TRegionReplicaSet, LoadTsFilePieceNode> entry : 
replicaSet2Piece.entrySet()) {
+        dataSize += deletionData.getDataSize();
+        entry.getValue().addTsFileData(deletionData);
+      }
+      return true;
+    }
+
+    private boolean sendAllTsFileData() {
+      for (Map.Entry<TRegionReplicaSet, LoadTsFilePieceNode> entry : 
replicaSet2Piece.entrySet()) {
+        if (!scheduler.dispatchOnePieceNode(entry.getValue(), entry.getKey())) 
{
+          logger.error(
+              String.format(
+                  "Dispatch piece node %s of TsFile %s error.",
+                  entry.getValue(), 
singleTsFileNode.getTsFileResource().getTsFile()));
+          return false;
+        }
+      }
+      return true;
+    }
+  }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/utils/FileLoaderUtils.java 
b/server/src/main/java/org/apache/iotdb/db/utils/FileLoaderUtils.java
index ee3ec2a058..bfd6a0bf80 100644
--- a/server/src/main/java/org/apache/iotdb/db/utils/FileLoaderUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/utils/FileLoaderUtils.java
@@ -31,6 +31,8 @@ import 
org.apache.iotdb.db.query.reader.chunk.metadata.DiskChunkMetadataLoader;
 import 
org.apache.iotdb.db.query.reader.chunk.metadata.MemAlignedChunkMetadataLoader;
 import org.apache.iotdb.db.query.reader.chunk.metadata.MemChunkMetadataLoader;
 import org.apache.iotdb.tsfile.file.metadata.AlignedTimeSeriesMetadata;
+import org.apache.iotdb.tsfile.file.metadata.ChunkGroupMetadata;
+import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
 import org.apache.iotdb.tsfile.file.metadata.IChunkMetadata;
 import org.apache.iotdb.tsfile.file.metadata.ITimeSeriesMetadata;
 import org.apache.iotdb.tsfile.file.metadata.TimeseriesMetadata;
@@ -39,6 +41,7 @@ import org.apache.iotdb.tsfile.read.controller.IChunkLoader;
 import org.apache.iotdb.tsfile.read.filter.basic.Filter;
 import org.apache.iotdb.tsfile.read.reader.IChunkReader;
 import org.apache.iotdb.tsfile.read.reader.IPageReader;
+import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -86,6 +89,27 @@ public class FileLoaderUtils {
     }
   }
 
+  /**
+   * Generate {@link TsFileResource} from a closed {@link TsFileIOWriter}. 
Notice that the writer
+   * should have executed {@link TsFileIOWriter#endFile()}. And this method 
will not record plan
+   * Index of this writer.
+   *
+   * @param writer a {@link TsFileIOWriter}
+   * @return a updated {@link TsFileResource}
+   */
+  public static TsFileResource generateTsFileResource(TsFileIOWriter writer) {
+    TsFileResource resource = new TsFileResource(writer.getFile());
+    for (ChunkGroupMetadata chunkGroupMetadata : 
writer.getChunkGroupMetadataList()) {
+      String device = chunkGroupMetadata.getDevice();
+      for (ChunkMetadata chunkMetadata : 
chunkGroupMetadata.getChunkMetadataList()) {
+        resource.updateStartTime(device, chunkMetadata.getStartTime());
+        resource.updateEndTime(device, chunkMetadata.getEndTime());
+      }
+    }
+    resource.setStatus(TsFileResourceStatus.CLOSED);
+    return resource;
+  }
+
   /**
    * @param resource TsFile
    * @param seriesPath Timeseries path
diff --git 
a/server/src/main/java/org/apache/iotdb/db/utils/TimePartitionUtils.java 
b/server/src/main/java/org/apache/iotdb/db/utils/TimePartitionUtils.java
index f39de5ba04..412e419c6d 100644
--- a/server/src/main/java/org/apache/iotdb/db/utils/TimePartitionUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/utils/TimePartitionUtils.java
@@ -34,6 +34,10 @@ public class TimePartitionUtils {
     return timePartitionSlot;
   }
 
+  public static long getTimePartitionIntervalForRouting() {
+    return timePartitionIntervalForRouting;
+  }
+
   @TestOnly
   public static void setTimePartitionIntervalForRouting(long 
timePartitionIntervalForRouting) {
     TimePartitionUtils.timePartitionIntervalForRouting = 
timePartitionIntervalForRouting;
diff --git 
a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/header/ChunkHeader.java 
b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/header/ChunkHeader.java
index 24a4418702..151e170495 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/header/ChunkHeader.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/header/ChunkHeader.java
@@ -21,10 +21,13 @@ package org.apache.iotdb.tsfile.file.header;
 
 import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
 import org.apache.iotdb.tsfile.file.MetaMarker;
+import org.apache.iotdb.tsfile.file.metadata.TimeseriesMetadata;
 import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
 import org.apache.iotdb.tsfile.read.reader.TsFileInput;
+import org.apache.iotdb.tsfile.utils.Pair;
 import org.apache.iotdb.tsfile.utils.ReadWriteForEncodingUtils;
 import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 
@@ -191,6 +194,24 @@ public class ChunkHeader {
         chunkType, measurementID, dataSize, chunkHeaderSize, dataType, type, 
encoding);
   }
 
+  /**
+   * Used by {@link
+   * 
TsFileSequenceReader#readTimeseriesCompressionTypeAndEncoding(TimeseriesMetadata)}
 to only
+   * decode data size, {@link CompressionType} and {@link TSEncoding}.
+   *
+   * @param inputStream
+   * @return
+   * @throws IOException
+   */
+  public static Pair<CompressionType, TSEncoding> 
deserializeCompressionTypeAndEncoding(
+      InputStream inputStream) throws IOException {
+    ReadWriteForEncodingUtils.readUnsignedVarInt(inputStream);
+    inputStream.skip(Byte.BYTES); // skip Data type
+    CompressionType type = ReadWriteIOUtils.readCompressionType(inputStream);
+    TSEncoding encoding = ReadWriteIOUtils.readEncoding(inputStream);
+    return new Pair<>(type, encoding);
+  }
+
   public int getSerializedSize() {
     return serializedSize;
   }
diff --git 
a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java 
b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java
index 8543f1fc8d..e713f5c9cb 100644
--- 
a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java
+++ 
b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java
@@ -56,6 +56,7 @@ import 
org.apache.iotdb.tsfile.read.reader.page.TimePageReader;
 import org.apache.iotdb.tsfile.read.reader.page.ValuePageReader;
 import org.apache.iotdb.tsfile.utils.BloomFilter;
 import org.apache.iotdb.tsfile.utils.Pair;
+import org.apache.iotdb.tsfile.utils.ReadWriteForEncodingUtils;
 import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
 import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
@@ -1173,6 +1174,27 @@ public class TsFileSequenceReader implements 
AutoCloseable {
         header, buffer, chunkCacheKey.getDeleteIntervalList(), 
chunkCacheKey.getStatistics());
   }
 
+  /**
+   * read the {@link CompressionType} and {@link TSEncoding} of a timeseries. 
This method will skip
+   * the measurement id, and data type. This method will change the position 
of this reader.
+   *
+   * @param timeseriesMetadata timeseries' metadata
+   * @return a pair of {@link CompressionType} and {@link TSEncoding} of given 
timeseries.
+   * @throws IOException
+   */
+  public Pair<CompressionType, TSEncoding> 
readTimeseriesCompressionTypeAndEncoding(
+      TimeseriesMetadata timeseriesMetadata) throws IOException {
+
+    String measurementId = timeseriesMetadata.getMeasurementId();
+    int measurementIdLength = 
measurementId.getBytes(TSFileConfig.STRING_CHARSET).length;
+    position(
+        
timeseriesMetadata.getChunkMetadataList().get(0).getOffsetOfChunkHeader()
+            + Byte.BYTES // chunkType
+            + ReadWriteForEncodingUtils.varIntSize(measurementIdLength) // 
measurementID length
+            + measurementIdLength); // measurementID
+    return 
ChunkHeader.deserializeCompressionTypeAndEncoding(tsFileInput.wrapAsInputStream());
+  }
+
   /** Get measurement schema by chunkMetadatas. */
   public MeasurementSchema getMeasurementSchema(List<IChunkMetadata> 
chunkMetadataList)
       throws IOException {
diff --git 
a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java 
b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java
index 7cb1868e98..2be174a913 100644
--- 
a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java
+++ 
b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java
@@ -686,6 +686,10 @@ public class TsFileIOWriter implements AutoCloseable {
     return currentChunkGroupDeviceId;
   }
 
+  public List<ChunkGroupMetadata> getChunkGroupMetadataList() {
+    return chunkGroupMetadataList;
+  }
+
   public void flush() throws IOException {
     out.flush();
   }
diff --git 
a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriterEndFileTest.java
 
b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriterEndFileTest.java
new file mode 100644
index 0000000000..1e8fd25081
--- /dev/null
+++ 
b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriterEndFileTest.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.tsfile.write.writer;
+
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.iotdb.tsfile.write.chunk.ChunkWriterImpl;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+
+import java.io.File;
+
+public class TsFileIOWriterEndFileTest {
+  public static void main(String[] args) throws Exception {
+    TsFileIOWriter writer = new TsFileIOWriter(new File("test.tsfile"));
+    for (int deviceIndex = 0; deviceIndex < 1000; deviceIndex++) {
+      writer.startChunkGroup("root.sg.d" + deviceIndex);
+      for (int seriesIndex = 0; seriesIndex < 1000; seriesIndex++) {
+        ChunkWriterImpl chunkWriter =
+            new ChunkWriterImpl(
+                new MeasurementSchema(
+                    "s" + seriesIndex, TSDataType.INT32, TSEncoding.RLE, 
CompressionType.GZIP));
+        for (long time = 0; time < 10; ++time) {
+          chunkWriter.write(time, 0);
+        }
+        chunkWriter.writeToFileWriter(writer);
+      }
+      writer.endChunkGroup();
+    }
+    writer.endFile();
+  }
+}

Reply via email to