This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch load_v2
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit f8f9b096f7f3825e8d3ad399b1acf75a1e7dcc32
Author: Tian Jiang <[email protected]>
AuthorDate: Mon Sep 4 16:28:05 2023 +0800

    add MergedTsFileSplitter
---
 .../execution/load/AlignedChunkData.java           |  20 +
 .../db/queryengine/execution/load/ChunkData.java   |   4 +
 .../db/queryengine/execution/load/LoadUtils.java   |  25 +
 .../execution/load/MergedTsFileSplitter.java       | 713 +++++++++++++++++++++
 .../execution/load/NonAlignedChunkData.java        |  39 ++
 .../queryengine/execution/load/TsFileSplitter.java |   9 +-
 .../execution/load/MergedTsFileSplitterTest.java   |  62 ++
 .../db/queryengine/execution/load/TestBase.java    | 119 ++++
 .../execution/load/TsFileSplitterTest.java         |  48 ++
 .../threadpool/WrappedThreadPoolExecutor.java      |   2 +-
 .../iotdb/tsfile/file/header/ChunkHeader.java      |   4 +
 .../iotdb/tsfile/file/header/PageHeader.java       |  11 +-
 .../iotdb/tsfile/read/TsFileSequenceReader.java    |  12 +-
 .../org/apache/iotdb/tsfile/utils/TsFileUtils.java |  16 +
 14 files changed, 1067 insertions(+), 17 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/AlignedChunkData.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/AlignedChunkData.java
index 689a6236bb0..6fb25fec0c7 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/AlignedChunkData.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/AlignedChunkData.java
@@ -208,6 +208,21 @@ public class AlignedChunkData implements ChunkData {
     }
   }
 
+  @Override
+  public void writeDecodePage(long[] times, Object[] values, int start, int 
end)
+      throws IOException {
+    pageNumbers.set(pageNumbers.size() - 1, pageNumbers.get(pageNumbers.size() 
- 1) + 1);
+    satisfiedLengthQueue.offer(end - start);
+    // serialize needDecode==true
+    dataSize += ReadWriteIOUtils.write(true, stream);
+    dataSize += ReadWriteIOUtils.write(end - start, stream);
+
+    for (int i = start; i < end; i++) {
+      long time = times[i];
+      dataSize += ReadWriteIOUtils.write(time, stream);
+    }
+  }
+
   public void writeDecodeValuePage(long[] times, TsPrimitiveType[] values, 
TSDataType dataType)
       throws IOException {
     pageNumbers.set(pageNumbers.size() - 1, pageNumbers.get(pageNumbers.size() 
- 1) + 1);
@@ -419,6 +434,11 @@ public class AlignedChunkData implements ChunkData {
     stream.close();
   }
 
+  @Override
+  public String firstMeasurement() {
+    return chunkHeaderList.get(0).getMeasurementID();
+  }
+
   @Override
   public String toString() {
     return "AlignedChunkData{"
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/ChunkData.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/ChunkData.java
index a30a62e2b16..e80993c5ead 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/ChunkData.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/ChunkData.java
@@ -45,6 +45,10 @@ public interface ChunkData extends TsFileData {
 
   void writeDecodePage(long[] times, Object[] values, int satisfiedLength) 
throws IOException;
 
+  void writeDecodePage(long[] times, Object[] values, int start, int end) 
throws IOException;
+
+  String firstMeasurement();
+
   @Override
   default boolean isModification() {
     return false;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/LoadUtils.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/LoadUtils.java
new file mode 100644
index 00000000000..3cbec3dacf3
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/LoadUtils.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.execution.load;
+
+public class LoadUtils {
+
+  private LoadUtils() {
+    // Util class
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/MergedTsFileSplitter.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/MergedTsFileSplitter.java
new file mode 100644
index 00000000000..1a969ebe766
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/MergedTsFileSplitter.java
@@ -0,0 +1,713 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.execution.load;
+
+import java.io.DataOutputStream;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingDeque;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
+import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.db.storageengine.dataregion.modification.Deletion;
+import org.apache.iotdb.db.storageengine.dataregion.modification.Modification;
+import 
org.apache.iotdb.db.storageengine.dataregion.modification.ModificationFile;
+import org.apache.iotdb.db.utils.TimePartitionUtils;
+import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
+import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
+import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
+import org.apache.iotdb.tsfile.encoding.decoder.Decoder;
+import org.apache.iotdb.tsfile.exception.TsFileRuntimeException;
+import org.apache.iotdb.tsfile.file.MetaMarker;
+import org.apache.iotdb.tsfile.file.header.ChunkGroupHeader;
+import org.apache.iotdb.tsfile.file.header.ChunkHeader;
+import org.apache.iotdb.tsfile.file.header.PageHeader;
+import org.apache.iotdb.tsfile.file.metadata.IChunkMetadata;
+import org.apache.iotdb.tsfile.file.metadata.TimeseriesMetadata;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
+import org.apache.iotdb.tsfile.read.common.BatchData;
+import org.apache.iotdb.tsfile.read.reader.page.PageReader;
+import org.apache.iotdb.tsfile.read.reader.page.TimePageReader;
+import org.apache.iotdb.tsfile.read.reader.page.ValuePageReader;
+import org.apache.iotdb.tsfile.utils.Pair;
+import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
+
+import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.PriorityQueue;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.function.Function;
+
+public class MergedTsFileSplitter {
+
+  private static final Logger logger = 
LoggerFactory.getLogger(TsFileSplitter.class);
+
+  private final List<File> tsFiles;
+  private final Function<TsFileData, Boolean> consumer;
+  private final PriorityQueue<SplitTask> taskPriorityQueue;
+  private ExecutorService asyncExecutor;
+
+  public MergedTsFileSplitter(List<File> tsFiles, Function<TsFileData, 
Boolean> consumer, ExecutorService asyncExecutor) {
+    this.tsFiles = tsFiles;
+    this.consumer = consumer;
+    this.asyncExecutor = asyncExecutor;
+    taskPriorityQueue = new PriorityQueue<>();
+  }
+
+  public void splitTsFileByDataPartition()
+      throws IOException, IllegalStateException {
+    for (File tsFile : tsFiles) {
+      SplitTask splitTask = new SplitTask(tsFile, asyncExecutor);
+      if (splitTask.hasNext()) {
+        taskPriorityQueue.add(splitTask);
+      }
+    }
+
+    List<SplitTask> equalTasks = new ArrayList<>();
+    while (!taskPriorityQueue.isEmpty()) {
+      SplitTask task = taskPriorityQueue.poll();
+      equalTasks.add(task);
+      // find chunks of the same series in other files
+      while (!taskPriorityQueue.isEmpty()) {
+        if (taskPriorityQueue.peek().compareTo(task) == 0) {
+          equalTasks.add(taskPriorityQueue.poll());
+        } else {
+          break;
+        }
+      }
+
+      for (SplitTask equalTask : equalTasks) {
+        TsFileData tsFileData = equalTask.removeNext();
+        consumer.apply(tsFileData);
+        if (equalTask.hasNext()) {
+          taskPriorityQueue.add(equalTask);
+        }
+      }
+      equalTasks.clear();
+    }
+  }
+
+  public void close() throws IOException {
+    for (SplitTask task : taskPriorityQueue) {
+      task.close();
+    }
+    taskPriorityQueue.clear();
+  }
+
+  private class SplitTask implements Comparable<SplitTask> {
+
+    private final File tsFile;
+    private TsFileSequenceReader reader;
+    private final TreeMap<Long, List<Deletion>> offset2Deletions;
+
+    private String curDevice;
+    private boolean isTimeChunkNeedDecode;
+    private Map<Integer, List<AlignedChunkData>> pageIndex2ChunkData;
+    private Map<Integer, long[]> pageIndex2Times;
+    private Map<Long, IChunkMetadata> offset2ChunkMetadata;
+
+    private BlockingQueue<TsFileData> nextSplits;
+    private TsFileData nextSplit;
+    private byte marker = -1;
+
+    private ExecutorService asyncExecutor;
+    private Future<Void> asyncTask;
+
+    public SplitTask(File tsFile, ExecutorService asyncExecutor) throws 
IOException {
+      this.tsFile = tsFile;
+      this.asyncExecutor = asyncExecutor;
+      offset2Deletions = new TreeMap<>();
+      init();
+    }
+
+    @Override
+    public int compareTo(SplitTask o) {
+      try {
+        TsFileData thisNext = showNext();
+        TsFileData thatNext = o.showNext();
+        // out put modification first
+        if (thisNext.isModification() && thatNext.isModification()) {
+          return 0;
+        }
+        if (thisNext.isModification()) {
+          return 1;
+        }
+        if (thatNext.isModification()) {
+          return -1;
+        }
+
+        ChunkData thisChunk = (ChunkData) thisNext;
+        ChunkData thatChunk = ((ChunkData) thatNext);
+        Comparator<ChunkData> chunkDataComparator =
+            Comparator.comparing(ChunkData::getDevice, String::compareTo)
+                .thenComparing(ChunkData::firstMeasurement, String::compareTo);
+        return chunkDataComparator.compare(thisChunk, thatChunk);
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+    }
+
+    private void init() throws IOException {
+      this.reader = new TsFileSequenceReader(tsFile.getAbsolutePath());
+      getAllModification(offset2Deletions);
+
+      if (!checkMagic(reader)) {
+        throw new TsFileRuntimeException(
+            String.format("Magic String check error when parsing TsFile %s.", 
tsFile.getPath()));
+      }
+      reader.position((long) TSFileConfig.MAGIC_STRING.getBytes().length + 1);
+
+      curDevice = null;
+      isTimeChunkNeedDecode = true;
+      pageIndex2ChunkData = new HashMap<>();
+      pageIndex2Times = null;
+      offset2ChunkMetadata = new HashMap<>();
+      getChunkMetadata(reader, offset2ChunkMetadata);
+
+      nextSplits = new LinkedBlockingDeque<>(64);
+      if (asyncExecutor != null) {
+        asyncTask = asyncExecutor.submit(() -> {
+          try {
+            asyncComputeNext();
+          } catch (Throwable e) {
+            logger.info("Exception during splitting", e);
+            throw e;
+          }
+          return null;
+        });
+      }
+    }
+
+    private void asyncComputeNext() throws IOException {
+      while (reader != null && !Thread.interrupted()) {
+        computeNext();
+      }
+      logger.info("{} end splitting", tsFile);
+    }
+
+    public boolean hasNext() throws IOException {
+      if (nextSplit != null && !(nextSplit instanceof EmptyTsFileData)) {
+        return true;
+      }
+      if (reader == null && nextSplits.isEmpty()) {
+        return false;
+      }
+
+      if (asyncExecutor == null) {
+        computeNext();
+        if (!nextSplits.isEmpty()) {
+          nextSplit = nextSplits.poll();
+          return true;
+        } else {
+          return false;
+        }
+      } else {
+        try {
+          nextSplit = nextSplits.take();
+        } catch (InterruptedException e) {
+          return false;
+        }
+        return !(nextSplit instanceof EmptyTsFileData);
+      }
+    }
+
+    public TsFileData removeNext() throws IOException {
+      if (!hasNext()) {
+        throw new NoSuchElementException();
+      }
+      TsFileData split = nextSplit;
+      nextSplit = null;
+      return split;
+    }
+
+    public TsFileData showNext() throws IOException {
+      if (!hasNext()) {
+        throw new NoSuchElementException();
+      }
+      return nextSplit;
+    }
+
+    public void close() throws IOException {
+      if (asyncTask != null) {
+        asyncTask.cancel(true);
+        try {
+          asyncTask.get();
+        } catch (CancellationException ignored) {
+
+        } catch (InterruptedException | ExecutionException e) {
+          throw new IOException(e);
+        }
+      }
+      if (reader != null) {
+        reader.close();
+        reader = null;
+      }
+    }
+
+    private byte nextMarker() throws IOException {
+      if (marker != -1) {
+        // inherit the marker from the previous breakpoint
+        // e.g. the marker after processing a chunk
+        return marker;
+      }
+      return marker = reader.readMarker();
+    }
+
+    private void insertNewChunk(ChunkData chunkData) throws IOException {
+      if (asyncExecutor == null) {
+        nextSplits.add(chunkData);
+      } else {
+        try {
+          nextSplits.put(chunkData);
+        } catch (InterruptedException e) {
+          throw new IOException(e);
+        }
+      }
+    }
+
+    @SuppressWarnings({"squid:S3776", "squid:S6541"})
+    private void computeNext() throws IOException, IllegalStateException {
+      if (reader == null) {
+        return;
+      }
+
+      while (nextMarker() != MetaMarker.SEPARATOR) {
+        switch (marker) {
+          case MetaMarker.CHUNK_HEADER:
+          case MetaMarker.TIME_CHUNK_HEADER:
+          case MetaMarker.ONLY_ONE_PAGE_CHUNK_HEADER:
+          case MetaMarker.ONLY_ONE_PAGE_TIME_CHUNK_HEADER:
+            long chunkOffset = reader.position();
+            boolean chunkDataGenerated =
+                consumeAllAlignedChunkData(chunkOffset, pageIndex2ChunkData);
+            handleModification(offset2Deletions, chunkOffset);
+            if (chunkDataGenerated) {
+              return;
+            }
+
+            ChunkHeader header = reader.readChunkHeader(marker);
+            String measurementId = header.getMeasurementID();
+            if (header.getDataSize() == 0) {
+              throw new TsFileRuntimeException(
+                  String.format(
+                      "Empty Nonaligned Chunk or Time Chunk with offset %d in 
TsFile %s.",
+                      chunkOffset, tsFile.getPath()));
+            }
+
+            boolean isAligned =
+                ((header.getChunkType() & TsFileConstant.TIME_COLUMN_MASK)
+                    == TsFileConstant.TIME_COLUMN_MASK);
+            IChunkMetadata chunkMetadata = 
offset2ChunkMetadata.get(chunkOffset - Byte.BYTES);
+            TTimePartitionSlot timePartitionSlot =
+                
TimePartitionUtils.getTimePartition(chunkMetadata.getStartTime());
+            ChunkData chunkData =
+                ChunkData.createChunkData(isAligned, curDevice, header, 
timePartitionSlot);
+
+            if (!needDecodeChunk(chunkMetadata)) {
+              chunkData.setNotDecode();
+              chunkData.writeEntireChunk(reader.readChunk(-1, 
header.getDataSize()), chunkMetadata);
+              if (isAligned) {
+                isTimeChunkNeedDecode = false;
+                pageIndex2ChunkData
+                    .computeIfAbsent(1, o -> new ArrayList<>())
+                    .add((AlignedChunkData) chunkData);
+              } else {
+                insertNewChunk(chunkData);
+              }
+              break;
+            }
+
+            Decoder defaultTimeDecoder =
+                Decoder.getDecoderByType(
+                    
TSEncoding.valueOf(TSFileDescriptor.getInstance().getConfig().getTimeEncoder()),
+                    TSDataType.INT64);
+            Decoder valueDecoder =
+                Decoder.getDecoderByType(header.getEncodingType(), 
header.getDataType());
+            int dataSize = header.getDataSize();
+            int pageIndex = 0;
+            if (isAligned) {
+              isTimeChunkNeedDecode = true;
+              pageIndex2Times = new HashMap<>();
+            }
+
+            while (dataSize > 0) {
+              PageHeader pageHeader =
+                  reader.readPageHeader(
+                      header.getDataType(),
+                      (header.getChunkType() & 0x3F) == 
MetaMarker.CHUNK_HEADER);
+              long pageDataSize = pageHeader.getSerializedPageSize();
+              if (!needDecodePage(pageHeader, chunkMetadata)) { // an entire 
page
+                long startTime =
+                    pageHeader.getStatistics() == null
+                        ? chunkMetadata.getStartTime()
+                        : pageHeader.getStartTime();
+                TTimePartitionSlot pageTimePartitionSlot =
+                    TimePartitionUtils.getTimePartition(startTime);
+                if (!timePartitionSlot.equals(pageTimePartitionSlot)) {
+                  if (!isAligned) {
+                    insertNewChunk(chunkData);
+                  }
+                  timePartitionSlot = pageTimePartitionSlot;
+                  chunkData =
+                      ChunkData.createChunkData(isAligned, curDevice, header, 
timePartitionSlot);
+                }
+                if (isAligned) {
+                  pageIndex2ChunkData
+                      .computeIfAbsent(pageIndex, o -> new ArrayList<>())
+                      .add((AlignedChunkData) chunkData);
+                }
+                chunkData.writeEntirePage(pageHeader, 
reader.readCompressedPage(pageHeader));
+              } else { // split page
+                ByteBuffer pageData = reader.readPage(pageHeader, 
header.getCompressionType());
+                Pair<long[], Object[]> tvArray =
+                    decodePage(
+                        isAligned, pageData, pageHeader, defaultTimeDecoder, 
valueDecoder, header);
+                long[] times = tvArray.left;
+                Object[] values = tvArray.right;
+                if (isAligned) {
+                  pageIndex2Times.put(pageIndex, times);
+                }
+
+                int start = 0;
+                long endTime =
+                    timePartitionSlot.getStartTime()
+                        + TimePartitionUtils.getTimePartitionInterval();
+                for (int i = 0; i < times.length; i++) {
+                  if (times[i] >= endTime) {
+                    chunkData.writeDecodePage(times, values, start, i);
+                    if (isAligned) {
+                      pageIndex2ChunkData
+                          .computeIfAbsent(pageIndex, o -> new ArrayList<>())
+                          .add((AlignedChunkData) chunkData);
+                    } else {
+                      insertNewChunk(chunkData);
+                    }
+
+                    timePartitionSlot = 
TimePartitionUtils.getTimePartition(times[i]);
+                    endTime =
+                        timePartitionSlot.getStartTime()
+                            + TimePartitionUtils.getTimePartitionInterval();
+                    chunkData =
+                        ChunkData.createChunkData(isAligned, curDevice, 
header, timePartitionSlot);
+                    start = i;
+                  }
+                }
+                chunkData.writeDecodePage(times, values, start, times.length);
+                if (isAligned) {
+                  pageIndex2ChunkData
+                      .computeIfAbsent(pageIndex, o -> new ArrayList<>())
+                      .add((AlignedChunkData) chunkData);
+                }
+              }
+
+              pageIndex += 1;
+              dataSize -= pageDataSize;
+            }
+
+            if (!isAligned) {
+              insertNewChunk(chunkData);
+            }
+            break;
+          case MetaMarker.VALUE_CHUNK_HEADER:
+          case MetaMarker.ONLY_ONE_PAGE_VALUE_CHUNK_HEADER:
+            chunkOffset = reader.position();
+            chunkMetadata = offset2ChunkMetadata.get(chunkOffset - Byte.BYTES);
+            header = reader.readChunkHeader(marker);
+            if (header.getDataSize() == 0) {
+              handleEmptyValueChunk(
+                  header, pageIndex2ChunkData, chunkMetadata, 
isTimeChunkNeedDecode);
+              break;
+            }
+
+            if (!isTimeChunkNeedDecode) {
+              AlignedChunkData alignedChunkData = 
pageIndex2ChunkData.get(1).get(0);
+              alignedChunkData.addValueChunk(header);
+              alignedChunkData.writeEntireChunk(
+                  reader.readChunk(-1, header.getDataSize()), chunkMetadata);
+              break;
+            }
+
+            Set<ChunkData> allChunkData = new HashSet<>();
+            dataSize = header.getDataSize();
+            pageIndex = 0;
+            valueDecoder = Decoder.getDecoderByType(header.getEncodingType(), 
header.getDataType());
+
+            while (dataSize > 0) {
+              PageHeader pageHeader =
+                  reader.readPageHeader(
+                      header.getDataType(),
+                      (header.getChunkType() & 0x3F) == 
MetaMarker.CHUNK_HEADER);
+              List<AlignedChunkData> alignedChunkDataList = 
pageIndex2ChunkData.get(pageIndex);
+              for (AlignedChunkData alignedChunkData : alignedChunkDataList) {
+                if (!allChunkData.contains(alignedChunkData)) {
+                  alignedChunkData.addValueChunk(header);
+                  allChunkData.add(alignedChunkData);
+                }
+              }
+              if (alignedChunkDataList.size() == 1) { // write entire page
+                // write the entire page if it's not an empty page.
+                alignedChunkDataList
+                    .get(0)
+                    .writeEntirePage(pageHeader, 
reader.readCompressedPage(pageHeader));
+              } else { // decode page
+                long[] times = pageIndex2Times.get(pageIndex);
+                TsPrimitiveType[] values =
+                    decodeValuePage(reader, header, pageHeader, times, 
valueDecoder);
+                for (AlignedChunkData alignedChunkData : alignedChunkDataList) 
{
+                  alignedChunkData.writeDecodeValuePage(times, values, 
header.getDataType());
+                }
+              }
+              long pageDataSize = pageHeader.getSerializedPageSize();
+              pageIndex += 1;
+              dataSize -= pageDataSize;
+            }
+            break;
+          case MetaMarker.CHUNK_GROUP_HEADER:
+            ChunkGroupHeader chunkGroupHeader = reader.readChunkGroupHeader();
+            curDevice = chunkGroupHeader.getDeviceID();
+            break;
+          case MetaMarker.OPERATION_INDEX_RANGE:
+            reader.readPlanIndex();
+            break;
+          default:
+            MetaMarker.handleUnexpectedMarker(marker);
+        }
+        marker = -1;
+        if (!nextSplits.isEmpty()) {
+          return;
+        }
+      }
+
+      consumeAllAlignedChunkData(reader.position(), pageIndex2ChunkData);
+      handleModification(offset2Deletions, Long.MAX_VALUE);
+      close();
+      if (asyncExecutor != null) {
+        nextSplits.add(new EmptyTsFileData());
+      }
+    }
+
+    private class EmptyTsFileData implements TsFileData {
+
+      @Override
+      public long getDataSize() {
+        return 0;
+      }
+
+      @Override
+      public void writeToFileWriter(TsFileIOWriter writer) throws IOException {
+
+      }
+
+      @Override
+      public boolean isModification() {
+        return false;
+      }
+
+      @Override
+      public void serialize(DataOutputStream stream) throws IOException {
+
+      }
+    }
+
+    private void getAllModification(Map<Long, List<Deletion>> 
offset2Deletions) throws IOException {
+      try (ModificationFile modificationFile =
+          new ModificationFile(tsFile.getAbsolutePath() + 
ModificationFile.FILE_SUFFIX)) {
+        for (Modification modification : modificationFile.getModifications()) {
+          offset2Deletions
+              .computeIfAbsent(modification.getFileOffset(), o -> new 
ArrayList<>())
+              .add((Deletion) modification);
+        }
+      }
+    }
+
+    private boolean checkMagic(TsFileSequenceReader reader) throws IOException 
{
+      String magic = reader.readHeadMagic();
+      if (!magic.equals(TSFileConfig.MAGIC_STRING)) {
+        logger.error("the file's MAGIC STRING is incorrect, file path: {}", 
reader.getFileName());
+        return false;
+      }
+
+      byte versionNumber = reader.readVersionNumber();
+      if (versionNumber != TSFileConfig.VERSION_NUMBER) {
+        logger.error("the file's Version Number is incorrect, file path: {}", 
reader.getFileName());
+        return false;
+      }
+
+      if (!reader.readTailMagic().equals(TSFileConfig.MAGIC_STRING)) {
+        logger.error("the file is not closed correctly, file path: {}", 
reader.getFileName());
+        return false;
+      }
+      return true;
+    }
+
+    private void getChunkMetadata(
+        TsFileSequenceReader reader, Map<Long, IChunkMetadata> 
offset2ChunkMetadata)
+        throws IOException {
+      Map<String, List<TimeseriesMetadata>> device2Metadata = 
reader.getAllTimeseriesMetadata(true);
+      for (Map.Entry<String, List<TimeseriesMetadata>> entry : 
device2Metadata.entrySet()) {
+        for (TimeseriesMetadata timeseriesMetadata : entry.getValue()) {
+          for (IChunkMetadata chunkMetadata : 
timeseriesMetadata.getChunkMetadataList()) {
+            offset2ChunkMetadata.put(chunkMetadata.getOffsetOfChunkHeader(), 
chunkMetadata);
+          }
+        }
+      }
+    }
+
+    private void handleModification(
+        TreeMap<Long, List<Deletion>> offset2Deletions, long chunkOffset) {
+      while (!offset2Deletions.isEmpty() && 
offset2Deletions.firstEntry().getKey() <= chunkOffset) {
+        offset2Deletions
+            .pollFirstEntry()
+            .getValue()
+            .forEach(o -> nextSplits.add(new DeletionData(o)));
+      }
+    }
+
+    private boolean consumeAllAlignedChunkData(
+        long offset, Map<Integer, List<AlignedChunkData>> pageIndex2ChunkData) 
{
+      if (pageIndex2ChunkData.isEmpty()) {
+        return false;
+      }
+
+      Set<ChunkData> allChunkData = new HashSet<>();
+      for (Map.Entry<Integer, List<AlignedChunkData>> entry : 
pageIndex2ChunkData.entrySet()) {
+        allChunkData.addAll(entry.getValue());
+      }
+      nextSplits.addAll(allChunkData);
+      pageIndex2ChunkData.clear();
+      return true;
+    }
+
+    private boolean needDecodeChunk(IChunkMetadata chunkMetadata) {
+      return !TimePartitionUtils.getTimePartition(chunkMetadata.getStartTime())
+          
.equals(TimePartitionUtils.getTimePartition(chunkMetadata.getEndTime()));
+    }
+
+    private boolean needDecodePage(PageHeader pageHeader, IChunkMetadata 
chunkMetadata) {
+      if (pageHeader.getStatistics() == null) {
+        return 
!TimePartitionUtils.getTimePartition(chunkMetadata.getStartTime())
+            
.equals(TimePartitionUtils.getTimePartition(chunkMetadata.getEndTime()));
+      }
+      return !TimePartitionUtils.getTimePartition(pageHeader.getStartTime())
+          
.equals(TimePartitionUtils.getTimePartition(pageHeader.getEndTime()));
+    }
+
+    private Pair<long[], Object[]> decodePage(
+        boolean isAligned,
+        ByteBuffer pageData,
+        PageHeader pageHeader,
+        Decoder timeDecoder,
+        Decoder valueDecoder,
+        ChunkHeader chunkHeader)
+        throws IOException {
+      if (isAligned) {
+        TimePageReader timePageReader = new TimePageReader(pageHeader, 
pageData, timeDecoder);
+        long[] times = timePageReader.getNextTimeBatch();
+        return new Pair<>(times, new Object[times.length]);
+      }
+
+      valueDecoder.reset();
+      PageReader pageReader =
+          new PageReader(pageData, chunkHeader.getDataType(), valueDecoder, 
timeDecoder, null);
+      BatchData batchData = pageReader.getAllSatisfiedPageData();
+      long[] times = new long[batchData.length()];
+      Object[] values = new Object[batchData.length()];
+      int index = 0;
+      while (batchData.hasCurrent()) {
+        times[index] = batchData.currentTime();
+        values[index++] = batchData.currentValue();
+        batchData.next();
+      }
+      return new Pair<>(times, values);
+    }
+
+    private void handleEmptyValueChunk(
+        ChunkHeader header,
+        Map<Integer, List<AlignedChunkData>> pageIndex2ChunkData,
+        IChunkMetadata chunkMetadata,
+        boolean isTimeChunkNeedDecode)
+        throws IOException {
+      Set<ChunkData> allChunkData = new HashSet<>();
+      for (Map.Entry<Integer, List<AlignedChunkData>> entry : 
pageIndex2ChunkData.entrySet()) {
+        for (AlignedChunkData alignedChunkData : entry.getValue()) {
+          if (!allChunkData.contains(alignedChunkData)) {
+            alignedChunkData.addValueChunk(header);
+            if (!isTimeChunkNeedDecode) {
+              alignedChunkData.writeEntireChunk(ByteBuffer.allocate(0), 
chunkMetadata);
+            }
+            allChunkData.add(alignedChunkData);
+          }
+        }
+      }
+    }
+
+    /**
+     * handle empty page in aligned chunk, if uncompressedSize and 
compressedSize are both 0, and
+     * the statistics is null, then the page is empty.
+     *
+     * @param pageHeader page header
+     * @return true if the page is empty
+     */
+    private boolean isEmptyPage(PageHeader pageHeader) {
+      return pageHeader.getUncompressedSize() == 0
+          && pageHeader.getCompressedSize() == 0
+          && pageHeader.getStatistics() == null;
+    }
+
+    private TsPrimitiveType[] decodeValuePage(
+        TsFileSequenceReader reader,
+        ChunkHeader chunkHeader,
+        PageHeader pageHeader,
+        long[] times,
+        Decoder valueDecoder)
+        throws IOException {
+      if (pageHeader.getSerializedPageSize() == 0) {
+        return new TsPrimitiveType[times.length];
+      }
+
+      valueDecoder.reset();
+      ByteBuffer pageData = reader.readPage(pageHeader, 
chunkHeader.getCompressionType());
+      ValuePageReader valuePageReader =
+          new ValuePageReader(pageHeader, pageData, chunkHeader.getDataType(), 
valueDecoder);
+      return valuePageReader.nextValueBatch(
+          times); // should be origin time, so recording satisfied length is 
necessary
+    }
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/NonAlignedChunkData.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/NonAlignedChunkData.java
index f898a1f6665..6bfcdfadf44 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/NonAlignedChunkData.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/NonAlignedChunkData.java
@@ -191,6 +191,40 @@ public class NonAlignedChunkData implements ChunkData {
     }
   }
 
+  public void writeDecodePage(long[] times, Object[] values, int start, int 
end)
+      throws IOException {
+    pageNumber += 1;
+    dataSize += ReadWriteIOUtils.write(true, stream);
+    dataSize += ReadWriteIOUtils.write(end - start, stream);
+
+    for (int i = start; i < end; i++) {
+      dataSize += ReadWriteIOUtils.write(times[i], stream);
+      switch (chunkHeader.getDataType()) {
+        case INT32:
+          dataSize += ReadWriteIOUtils.write((int) values[i], stream);
+          break;
+        case INT64:
+          dataSize += ReadWriteIOUtils.write((long) values[i], stream);
+          break;
+        case FLOAT:
+          dataSize += ReadWriteIOUtils.write((float) values[i], stream);
+          break;
+        case DOUBLE:
+          dataSize += ReadWriteIOUtils.write((double) values[i], stream);
+          break;
+        case BOOLEAN:
+          dataSize += ReadWriteIOUtils.write((boolean) values[i], stream);
+          break;
+        case TEXT:
+          dataSize += ReadWriteIOUtils.write((Binary) values[i], stream);
+          break;
+        default:
+          throw new UnSupportedDataTypeException(
+              String.format("Data type %s is not supported.", 
chunkHeader.getDataType()));
+      }
+    }
+  }
+
   private void deserializeTsFileData(InputStream stream) throws IOException, 
PageException {
     if (needDecodeChunk) {
       buildChunkWriter(stream);
@@ -283,6 +317,11 @@ public class NonAlignedChunkData implements ChunkData {
     stream.close();
   }
 
+  @Override
+  public String firstMeasurement() {
+    return chunkHeader.getMeasurementID();
+  }
+
   @Override
   public String toString() {
     return "NonAlignedChunkData{"
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/TsFileSplitter.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/TsFileSplitter.java
index 7f66c9813f7..2c915c10941 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/TsFileSplitter.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/TsFileSplitter.java
@@ -183,13 +183,13 @@ public class TsFileSplitter {
                   pageIndex2Times.put(pageIndex, times);
                 }
 
-                int satisfiedLength = 0;
+                int start = 0;
                 long endTime =
                     timePartitionSlot.getStartTime()
                         + TimePartitionUtils.getTimePartitionInterval();
                 for (int i = 0; i < times.length; i++) {
                   if (times[i] >= endTime) {
-                    chunkData.writeDecodePage(times, values, satisfiedLength);
+                    chunkData.writeDecodePage(times, values, start, i);
                     if (isAligned) {
                       pageIndex2ChunkData
                           .computeIfAbsent(pageIndex, o -> new ArrayList<>())
@@ -199,16 +199,15 @@ public class TsFileSplitter {
                     }
 
                     timePartitionSlot = 
TimePartitionUtils.getTimePartition(times[i]);
-                    satisfiedLength = 0;
                     endTime =
                         timePartitionSlot.getStartTime()
                             + TimePartitionUtils.getTimePartitionInterval();
                     chunkData =
                         ChunkData.createChunkData(isAligned, curDevice, 
header, timePartitionSlot);
+                    start = i;
                   }
-                  satisfiedLength += 1;
                 }
-                chunkData.writeDecodePage(times, values, satisfiedLength);
+                chunkData.writeDecodePage(times, values, start, times.length);
                 if (isAligned) {
                   pageIndex2ChunkData
                       .computeIfAbsent(pageIndex, o -> new ArrayList<>())
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/load/MergedTsFileSplitterTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/load/MergedTsFileSplitterTest.java
new file mode 100644
index 00000000000..857a5a9a4e7
--- /dev/null
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/load/MergedTsFileSplitterTest.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.execution.load;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.TimeUnit;
+import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.commons.concurrent.IoTThreadFactory;
+import org.junit.Test;
+
+public class MergedTsFileSplitterTest extends TestBase {
+
+  private List<TsFileData> resultSet = new ArrayList<>();
+
+  @Test
+  public void testSplit() throws IOException {
+    long start = System.currentTimeMillis();
+    MergedTsFileSplitter splitter = new MergedTsFileSplitter(files, 
this::consumeSplit,
+        IoTDBThreadPoolFactory.newThreadPool(16, Integer.MAX_VALUE, 20, 
TimeUnit.SECONDS,
+            new SynchronousQueue<>(),
+            new IoTThreadFactory("MergedTsFileSplitter"), 
"MergedTsFileSplitter"));
+    try {
+      splitter.splitTsFileByDataPartition();
+      for (TsFileData tsFileData : resultSet) {
+        // System.out.println(tsFileData);
+      }
+    } finally {
+      splitter.close();
+    }
+    System.out.printf("%d splits after %dms\n", resultSet.size(),
+        System.currentTimeMillis() - start);
+  }
+
+  public boolean consumeSplit(TsFileData data) {
+    resultSet.add(data);
+    if (resultSet.size() % 1000 == 0) {
+      System.out.printf("%d chunks split\n", resultSet.size());
+      System.out.printf("Maxmem: %d, freemem: %d\n", 
Runtime.getRuntime().maxMemory(), Runtime.getRuntime().freeMemory());
+    }
+    return true;
+  }
+}
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/load/TestBase.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/load/TestBase.java
new file mode 100644
index 00000000000..9002b3e6168
--- /dev/null
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/load/TestBase.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.execution.load;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.iotdb.db.utils.TimePartitionUtils;
+import org.apache.iotdb.tsfile.exception.write.WriteProcessException;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.Path;
+import org.apache.iotdb.tsfile.utils.TsFileGeneratorUtils;
+import org.apache.iotdb.tsfile.write.TsFileWriter;
+import org.apache.iotdb.tsfile.write.record.TSRecord;
+import org.apache.iotdb.tsfile.write.record.datapoint.DoubleDataPoint;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+import org.junit.After;
+import org.junit.Before;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TestBase {
+
+  private static final Logger logger = LoggerFactory.getLogger(TestBase.class);
+  public static final String BASE_OUTPUT_PATH = 
"target".concat(File.separator);
+  public static final String PARTIAL_PATH_STRING =
+      "%s" + File.separator + "%d" + File.separator + "%d" + File.separator;
+  public static final String TEST_TSFILE_PATH =
+      BASE_OUTPUT_PATH + "testTsFile".concat(File.separator) + 
PARTIAL_PATH_STRING;
+
+  protected int fileNum = 100;
+  // series number of each file, sn non-aligned series and 1 aligned series 
with sn measurements
+  protected int seriesNum = 1000;
+  // number of chunks of each series in a file, each series has only one chunk 
in a file
+  protected double chunkTimeRangeRatio = 0.3;
+  // the interval between two consecutive points of a series
+  protected long pointInterval = 50_000;
+  protected List<File> files = new ArrayList<>();
+
+  @Before
+  public void setup() throws IOException, WriteProcessException {
+    setupFiles();
+    logger.info("Files set up");
+  }
+
+  @After
+  public void cleanup() {
+    for (File file : files) {
+      file.delete();
+    }
+  }
+
+  public void setupFiles() throws IOException, WriteProcessException {
+
+    for (int i = 0; i < fileNum; i++) {
+      File file = new File(getTestTsFilePath("root.sg1", 0, 0, i));
+      files.add(file);
+
+      try (TsFileWriter writer = new TsFileWriter(file)) {
+        // 3 non-aligned series under d1 and 1 aligned series with 3 
measurements under d2
+        for (int sn = 0; sn < seriesNum; sn++) {
+          writer.registerTimeseries(
+              new Path("d1"), new MeasurementSchema("s" + sn, 
TSDataType.DOUBLE));
+        }
+        List<MeasurementSchema> alignedSchemas = new ArrayList<>();
+        for (int sn = 0; sn < seriesNum; sn++) {
+          alignedSchemas.add(new MeasurementSchema("s" + sn, 
TSDataType.DOUBLE));
+        }
+        writer.registerAlignedTimeseries(new Path("d2"), alignedSchemas);
+
+        long timePartitionInterval = 
TimePartitionUtils.getTimePartitionInterval();
+        long chunkTimeRange = (long) (timePartitionInterval * 
chunkTimeRangeRatio);
+        int chunkPointNum = (int) (chunkTimeRange / pointInterval);
+
+        for (int pn = 0; pn < chunkPointNum; pn++) {
+          long currTime = chunkTimeRange * fileNum + pointInterval * pn;
+          TSRecord record = new TSRecord(currTime, "d1");
+          for (int sn = 0; sn < seriesNum; sn++) {
+            record.addTuple(new DoubleDataPoint("s" + sn, pn * 1.0));
+          }
+          writer.write(record);
+
+          record.deviceId = "d2";
+          writer.writeAligned(record);
+        }
+        writer.flushAllChunkGroups();
+      }
+    }
+  }
+
+  public static String getTestTsFilePath(
+      String logicalStorageGroupName,
+      long VirtualStorageGroupId,
+      long TimePartitionId,
+      long tsFileVersion) {
+    String filePath =
+        String.format(
+            TEST_TSFILE_PATH, logicalStorageGroupName, VirtualStorageGroupId, 
TimePartitionId);
+    return TsFileGeneratorUtils.getTsFilePath(filePath, tsFileVersion);
+  }
+}
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/load/TsFileSplitterTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/load/TsFileSplitterTest.java
new file mode 100644
index 00000000000..78b61c8d664
--- /dev/null
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/load/TsFileSplitterTest.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.execution.load;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import org.junit.Test;
+
+public class TsFileSplitterTest extends TestBase{
+  private List<TsFileData> resultSet = new ArrayList<>();
+
+  @Test
+  public void testSplit() throws IOException {
+    long start = System.currentTimeMillis();
+    for (File file : files) {
+      TsFileSplitter splitter = new TsFileSplitter(file, this::consumeSplit);
+      splitter.splitTsFileByDataPartition();
+    }
+    for (TsFileData tsFileData : resultSet) {
+      // System.out.println(tsFileData);
+    }
+    System.out.printf("%d splits after %dms\n", resultSet.size(), 
System.currentTimeMillis() - start);
+  }
+
+  public boolean consumeSplit(TsFileData data) {
+    resultSet.add(data);
+    return true;
+  }
+}
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/threadpool/WrappedThreadPoolExecutor.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/threadpool/WrappedThreadPoolExecutor.java
index d81c97f8454..c36bb6f34ed 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/threadpool/WrappedThreadPoolExecutor.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/threadpool/WrappedThreadPoolExecutor.java
@@ -107,7 +107,7 @@ public class WrappedThreadPoolExecutor extends 
ThreadPoolExecutor
         Thread.currentThread().interrupt();
       }
     }
-    if (t != null) {
+    if (t != null && !(t instanceof CancellationException)) {
       logger.error("Exception in thread pool {}", mbeanName, t);
     }
   }
diff --git 
a/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/file/header/ChunkHeader.java
 
b/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/file/header/ChunkHeader.java
index 9fca658693e..956bfef93df 100644
--- 
a/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/file/header/ChunkHeader.java
+++ 
b/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/file/header/ChunkHeader.java
@@ -321,4 +321,8 @@ public class ChunkHeader {
   public void increasePageNums(int i) {
     numOfPages += i;
   }
+
+  public boolean hasStatistic() {
+    return (getChunkType() & 0x3F) == MetaMarker.CHUNK_HEADER;
+  }
 }
diff --git 
a/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/file/header/PageHeader.java
 
b/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/file/header/PageHeader.java
index 1863c986b72..81274c37b18 100644
--- 
a/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/file/header/PageHeader.java
+++ 
b/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/file/header/PageHeader.java
@@ -65,13 +65,22 @@ public class PageHeader {
     return new PageHeader(uncompressedSize, compressedSize, statistics);
   }
 
+  // for compatibility, the previous implementation does not provide parameter 
'hasStatistic'
   public static PageHeader deserializeFrom(ByteBuffer buffer, TSDataType 
dataType) {
+    return deserializeFrom(buffer, dataType, true);
+  }
+
+  public static PageHeader deserializeFrom(
+      ByteBuffer buffer, TSDataType dataType, boolean hasStatistic) {
     int uncompressedSize = 
ReadWriteForEncodingUtils.readUnsignedVarInt(buffer);
     if (uncompressedSize == 0) { // Empty Page
       return new PageHeader(0, 0, null);
     }
     int compressedSize = ReadWriteForEncodingUtils.readUnsignedVarInt(buffer);
-    Statistics<? extends Serializable> statistics = 
Statistics.deserialize(buffer, dataType);
+    Statistics<? extends Serializable> statistics = null;
+    if (hasStatistic) {
+      statistics = Statistics.deserialize(buffer, dataType);
+    }
     return new PageHeader(uncompressedSize, compressedSize, statistics);
   }
 
diff --git 
a/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java
 
b/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java
index e8925543076..ba4f8f1fe97 100644
--- 
a/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java
+++ 
b/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java
@@ -22,7 +22,6 @@ package org.apache.iotdb.tsfile.read;
 import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
 import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
 import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
-import org.apache.iotdb.tsfile.compress.IUnCompressor;
 import org.apache.iotdb.tsfile.encoding.decoder.Decoder;
 import org.apache.iotdb.tsfile.exception.TsFileRuntimeException;
 import org.apache.iotdb.tsfile.exception.TsFileStatisticsMistakesException;
@@ -59,6 +58,7 @@ import org.apache.iotdb.tsfile.utils.BloomFilter;
 import org.apache.iotdb.tsfile.utils.Pair;
 import org.apache.iotdb.tsfile.utils.ReadWriteForEncodingUtils;
 import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+import org.apache.iotdb.tsfile.utils.TsFileUtils;
 import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
 import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
 import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
@@ -1397,15 +1397,7 @@ public class TsFileSequenceReader implements 
AutoCloseable {
   }
 
   public ByteBuffer readPage(PageHeader header, CompressionType type) throws 
IOException {
-    ByteBuffer buffer = readData(-1, header.getCompressedSize());
-    if (header.getUncompressedSize() == 0 || type == 
CompressionType.UNCOMPRESSED) {
-      return buffer;
-    } // FIXME if the buffer is not array-implemented.
-    IUnCompressor unCompressor = IUnCompressor.getUnCompressor(type);
-    ByteBuffer uncompressedBuffer = 
ByteBuffer.allocate(header.getUncompressedSize());
-    unCompressor.uncompress(
-        buffer.array(), buffer.position(), buffer.remaining(), 
uncompressedBuffer.array(), 0);
-    return uncompressedBuffer;
+    return TsFileUtils.uncompressPage(header, type, readData(-1, 
header.getCompressedSize()));
   }
 
   /**
diff --git 
a/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/TsFileUtils.java
 
b/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/TsFileUtils.java
index 03ba354c82d..a84c735e549 100644
--- 
a/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/TsFileUtils.java
+++ 
b/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/TsFileUtils.java
@@ -17,10 +17,14 @@
  */
 package org.apache.iotdb.tsfile.utils;
 
+import org.apache.iotdb.tsfile.compress.IUnCompressor;
+import org.apache.iotdb.tsfile.file.header.PageHeader;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
 import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
 
 import java.io.File;
 import java.io.IOException;
+import java.nio.ByteBuffer;
 
 public class TsFileUtils {
 
@@ -55,4 +59,16 @@ public class TsFileUtils {
     File folder = 
tsFile.getParentFile().getParentFile().getParentFile().getParentFile();
     return folder.getName().equals("sequence");
   }
+
+  public static ByteBuffer uncompressPage(
+      PageHeader header, CompressionType type, ByteBuffer buffer) throws 
IOException {
+    if (header.getUncompressedSize() == 0 || type == 
CompressionType.UNCOMPRESSED) {
+      return buffer;
+    } // FIXME if the buffer is not array-implemented.
+    IUnCompressor unCompressor = IUnCompressor.getUnCompressor(type);
+    ByteBuffer uncompressedBuffer = 
ByteBuffer.allocate(header.getUncompressedSize());
+    unCompressor.uncompress(
+        buffer.array(), buffer.position(), buffer.remaining(), 
uncompressedBuffer.array(), 0);
+    return uncompressedBuffer;
+  }
 }

Reply via email to