This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch fix_3326
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit d23155c81ddce7cf1ff7b5105cec731ed2a65161
Author: jt <[email protected]>
AuthorDate: Wed Jun 1 14:33:40 2022 +0800

    fix new series in unseq files are not merged into correct seqfiles
---
 .../db/engine/merge/task/MergeMultiChunkTask.java  | 265 ++++++++++-----------
 .../iotdb/db/engine/merge/MergeNewSeriesTest.java  | 115 +++++++++
 .../iotdb/db/engine/merge/MergeOverLapTest.java    |  62 +----
 .../iotdb/db/engine/merge/MergeTaskTest.java       |   6 +-
 .../apache/iotdb/db/engine/merge/MergeTest.java    | 216 ++++++++++++-----
 .../iotdb/tsfile/read/TsFileSequenceReader.java    |  83 +++++++
 .../apache/iotdb/tsfile/utils/PreviewIterator.java |  27 +++
 7 files changed, 507 insertions(+), 267 deletions(-)

diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeMultiChunkTask.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeMultiChunkTask.java
index 36032f9269..c94bc38a58 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeMultiChunkTask.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeMultiChunkTask.java
@@ -41,6 +41,8 @@ import org.apache.iotdb.tsfile.read.common.BatchData;
 import org.apache.iotdb.tsfile.read.common.Chunk;
 import org.apache.iotdb.tsfile.read.reader.IPointReader;
 import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReader;
+import org.apache.iotdb.tsfile.utils.Pair;
+import org.apache.iotdb.tsfile.utils.PreviewIterator;
 import org.apache.iotdb.tsfile.write.chunk.IChunkWriter;
 import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
 import org.apache.iotdb.tsfile.write.writer.RestorableTsFileIOWriter;
@@ -53,10 +55,8 @@ import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashMap;
-import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
-import java.util.Map.Entry;
 import java.util.PriorityQueue;
 import java.util.TreeMap;
 import java.util.concurrent.Callable;
@@ -68,6 +68,7 @@ import static 
org.apache.iotdb.db.utils.MergeUtils.writeBatchPoint;
 import static org.apache.iotdb.db.utils.MergeUtils.writeTVPair;
 import static org.apache.iotdb.db.utils.QueryUtils.modifyChunkMetaData;
 
+@SuppressWarnings("java:S107") // suppress too many args
 public class MergeMultiChunkTask {
 
   private static final Logger logger = 
LoggerFactory.getLogger(MergeMultiChunkTask.class);
@@ -75,38 +76,34 @@ public class MergeMultiChunkTask {
       
IoTDBDescriptor.getInstance().getConfig().getMergeChunkPointNumberThreshold();
 
   private MergeLogger mergeLogger;
+  // this task will be merging the series
   private List<PartialPath> unmergedSeries;
 
+  private String storageGroupName;
   private String taskName;
   private MergeResource resource;
-  private TimeValuePair[] currTimeValuePairs;
+  private MergeContext mergeContext;
+
+  private int concurrentMergeSeriesNum;
   private boolean fullMerge;
 
-  private MergeContext mergeContext;
+  private List<PartialPath> currMergingPaths = new ArrayList<>();
+  // the earliest point of each "currMergingPaths" from unseq files
+  private TimeValuePair[] currUnseqTimeValuePairs;
 
   private AtomicInteger mergedChunkNum = new AtomicInteger();
   private AtomicInteger unmergedChunkNum = new AtomicInteger();
   private int mergedSeriesCnt;
   private double progress;
 
-  private int concurrentMergeSeriesNum;
-  private List<PartialPath> currMergingPaths = new ArrayList<>();
-  // need to be cleared every device
-  private final Map<TsFileSequenceReader, Iterator<Map<String, 
List<ChunkMetadata>>>>
-      measurementChunkMetadataListMapIteratorCache =
+  // the key of the map is a seq file, and the value is an iterator that 
traverse chunk lists of
+  // the current merging device. The chunk lists are ordered by measurements.
+  private final Map<TsFileSequenceReader, PreviewIterator<Pair<String, 
List<ChunkMetadata>>>>
+      seqFileChunkMetadataListIteratorCache =
           new TreeMap<>(
               (o1, o2) ->
                   TsFileManagement.compareFileName(
                       new File(o1.getFileName()), new File(o2.getFileName())));
-  // need to be cleared every device
-  private final Map<TsFileSequenceReader, Map<String, List<ChunkMetadata>>>
-      chunkMetadataListCacheForMerge =
-          new TreeMap<>(
-              (o1, o2) ->
-                  TsFileManagement.compareFileName(
-                      new File(o1.getFileName()), new File(o2.getFileName())));
-
-  private String storageGroupName;
 
   public MergeMultiChunkTask(
       MergeContext context,
@@ -152,8 +149,7 @@ public class MergeMultiChunkTask {
         mergedSeriesCnt += currMergingPaths.size();
         logMergeProgress();
       }
-      measurementChunkMetadataListMapIteratorCache.clear();
-      chunkMetadataListCacheForMerge.clear();
+      seqFileChunkMetadataListIteratorCache.clear();
     }
     if (logger.isInfoEnabled()) {
       logger.info(
@@ -176,15 +172,17 @@ public class MergeMultiChunkTask {
   }
 
   private void mergePaths() throws IOException, MetadataException {
+    // read the first point of each series from unseq files
     IPointReader[] unseqReaders = resource.getUnseqReaders(currMergingPaths);
-    currTimeValuePairs = new TimeValuePair[currMergingPaths.size()];
+    currUnseqTimeValuePairs = new TimeValuePair[currMergingPaths.size()];
     for (int i = 0; i < currMergingPaths.size(); i++) {
       if (unseqReaders[i].hasNextTimeValuePair()) {
-        currTimeValuePairs[i] = unseqReaders[i].currentTimeValuePair();
+        currUnseqTimeValuePairs[i] = unseqReaders[i].currentTimeValuePair();
       }
     }
 
     for (int i = 0; i < resource.getSeqFiles().size(); i++) {
+      // merge unseq data into seq files one by one
       pathsMergeOneFile(i, unseqReaders);
 
       if (Thread.interrupted()) {
@@ -194,14 +192,14 @@ public class MergeMultiChunkTask {
     }
   }
 
-  private String getMaxSensor(List<PartialPath> sensors) {
-    String maxSensor = sensors.get(0).getMeasurement();
-    for (int i = 1; i < sensors.size(); i++) {
-      if (maxSensor.compareTo(sensors.get(i).getMeasurement()) < 0) {
-        maxSensor = sensors.get(i).getMeasurement();
+  private long calculateDeviceMinTime(TsFileResource currTsFile, String 
deviceId) {
+    long currDeviceMinTime = currTsFile.getStartTime(deviceId);
+    for (TimeValuePair timeValuePair : currUnseqTimeValuePairs) {
+      if (timeValuePair != null && timeValuePair.getTimestamp() < 
currDeviceMinTime) {
+        currDeviceMinTime = timeValuePair.getTimestamp();
       }
     }
-    return maxSensor;
+    return currDeviceMinTime;
   }
 
   private void pathsMergeOneFile(int seqFileIdx, IPointReader[] unseqReaders)
@@ -209,30 +207,24 @@ public class MergeMultiChunkTask {
     TsFileResource currTsFile = resource.getSeqFiles().get(seqFileIdx);
     // all paths in one call are from the same device
     String deviceId = currMergingPaths.get(0).getDevice();
-    long currDeviceMinTime = currTsFile.getStartTime(deviceId);
+    long currDeviceMinTime = calculateDeviceMinTime(currTsFile, deviceId);
 
     for (PartialPath path : currMergingPaths) {
       mergeContext.getUnmergedChunkStartTimes().get(currTsFile).put(path, new 
ArrayList<>());
     }
 
-    // if this TsFile receives data later than fileLimitTime, it will overlap 
the next TsFile,
-    // which is forbidden
-    for (TimeValuePair timeValuePair : currTimeValuePairs) {
-      if (timeValuePair != null && timeValuePair.getTimestamp() < 
currDeviceMinTime) {
-        currDeviceMinTime = timeValuePair.getTimestamp();
-      }
-    }
     boolean isLastFile = seqFileIdx + 1 == resource.getSeqFiles().size();
 
     TsFileSequenceReader fileSequenceReader = 
resource.getFileReader(currTsFile);
     List<Modification>[] modifications = new List[currMergingPaths.size()];
     List<ChunkMetadata>[] seqChunkMeta = new List[currMergingPaths.size()];
-    Iterator<Map<String, List<ChunkMetadata>>> 
measurementChunkMetadataListMapIterator =
-        measurementChunkMetadataListMapIteratorCache.computeIfAbsent(
+
+    PreviewIterator<Pair<String, List<ChunkMetadata>>> 
measurementChunkMetadataListIterator =
+        seqFileChunkMetadataListIteratorCache.computeIfAbsent(
             fileSequenceReader,
             (tsFileSequenceReader -> {
               try {
-                return 
tsFileSequenceReader.getMeasurementChunkMetadataListMapIterator(deviceId);
+                return 
tsFileSequenceReader.getMeasurementChunkMetadataListIterator(deviceId);
               } catch (IOException e) {
                 logger.error(
                     "unseq compaction task {}, 
getMeasurementChunkMetadataListMapIterator meets error. iterator create 
failed.",
@@ -241,70 +233,12 @@ public class MergeMultiChunkTask {
                 return null;
               }
             }));
-    if (measurementChunkMetadataListMapIterator == null) {
+    if (measurementChunkMetadataListIterator == null) {
       return;
     }
 
-    String lastSensor = getMaxSensor(currMergingPaths);
-    String currSensor = null;
-    Map<String, List<ChunkMetadata>> measurementChunkMetadataListMap = new 
TreeMap<>();
-    // find all sensor to merge in order, if exceed, then break
-    while (currSensor == null || currSensor.compareTo(lastSensor) < 0) {
-      measurementChunkMetadataListMap =
-          chunkMetadataListCacheForMerge.computeIfAbsent(
-              fileSequenceReader, tsFileSequenceReader -> new TreeMap<>());
-      // if empty, get measurementChunkMetadataList block to use later
-      if (measurementChunkMetadataListMap.isEmpty()) {
-        // if do not have more sensor, just break
-        if (measurementChunkMetadataListMapIterator.hasNext()) {
-          
measurementChunkMetadataListMap.putAll(measurementChunkMetadataListMapIterator.next());
-        } else {
-          break;
-        }
-      }
-
-      Iterator<Entry<String, List<ChunkMetadata>>> 
measurementChunkMetadataListEntryIterator =
-          measurementChunkMetadataListMap.entrySet().iterator();
-      while (measurementChunkMetadataListEntryIterator.hasNext()) {
-        Entry<String, List<ChunkMetadata>> measurementChunkMetadataListEntry =
-            measurementChunkMetadataListEntryIterator.next();
-        currSensor = measurementChunkMetadataListEntry.getKey();
-
-        // fill modifications and seqChunkMetas to be used later
-        for (int i = 0; i < currMergingPaths.size(); i++) {
-          if (currMergingPaths.get(i).getMeasurement().equals(currSensor)) {
-            modifications[i] = resource.getModifications(currTsFile, 
currMergingPaths.get(i));
-            seqChunkMeta[i] = measurementChunkMetadataListEntry.getValue();
-            modifyChunkMetaData(seqChunkMeta[i], modifications[i]);
-            for (ChunkMetadata chunkMetadata : seqChunkMeta[i]) {
-              resource.updateStartTime(currTsFile, deviceId, 
chunkMetadata.getStartTime());
-              resource.updateEndTime(currTsFile, deviceId, 
chunkMetadata.getEndTime());
-            }
-
-            if (Thread.interrupted()) {
-              Thread.currentThread().interrupt();
-              return;
-            }
-            break;
-          }
-        }
-
-        // current sensor larger than last needed sensor, just break out to 
outer loop
-        if (currSensor.compareTo(lastSensor) > 0) {
-          break;
-        } else {
-          measurementChunkMetadataListEntryIterator.remove();
-        }
-      }
-    }
-    // update measurementChunkMetadataListMap
-    chunkMetadataListCacheForMerge.put(fileSequenceReader, 
measurementChunkMetadataListMap);
-
-    List<Integer> unskippedPathIndices = filterNoDataPaths(seqChunkMeta, 
seqFileIdx);
-
-    if (unskippedPathIndices.isEmpty()) {
-      return;
-    }
+    readChunkMetaAndModifications(
+        measurementChunkMetadataListIterator, modifications, seqChunkMeta, 
currTsFile, deviceId);
 
     RestorableTsFileIOWriter mergeFileWriter = 
resource.getMergeFileWriter(currTsFile, false);
     for (PartialPath path : currMergingPaths) {
@@ -328,18 +262,57 @@ public class MergeMultiChunkTask {
     }
   }
 
-  private List<Integer> filterNoDataPaths(List[] seqChunkMeta, int seqFileIdx) 
{
-    // if the last seqFile does not contains this series but the unseqFiles 
do, data of this
-    // series should also be written into a new chunk
-    List<Integer> ret = new ArrayList<>();
-    for (int i = 0; i < currMergingPaths.size(); i++) {
-      if ((seqChunkMeta[i] == null || seqChunkMeta[i].isEmpty())
-          && !(seqFileIdx + 1 == resource.getSeqFiles().size() && 
currTimeValuePairs[i] != null)) {
-        continue;
+  private void readChunkMetaAndModifications(
+      PreviewIterator<Pair<String, List<ChunkMetadata>>> 
measurementChunkMetadataListIterator,
+      List<Modification>[] modifications,
+      List<ChunkMetadata>[] seqChunkMeta,
+      TsFileResource currTsFile,
+      String deviceId) {
+    if (!measurementChunkMetadataListIterator.hasNext()) {
+      return;
+    }
+    Pair<String, List<ChunkMetadata>> nextFileMeasurement =
+        measurementChunkMetadataListIterator.previewNext();
+    boolean noMoreMeasurement = false;
+    int i = 0;
+    while (!noMoreMeasurement) {
+      String currMergingMeasurement = currMergingPaths.get(i).getMeasurement();
+      if (currMergingMeasurement.equals(nextFileMeasurement.left)) {
+        // current series is found in the file
+        modifications[i] = resource.getModifications(currTsFile, 
currMergingPaths.get(i));
+        seqChunkMeta[i] = nextFileMeasurement.right;
+        modifyChunkMetaData(seqChunkMeta[i], modifications[i]);
+        for (ChunkMetadata chunkMetadata : seqChunkMeta[i]) {
+          resource.updateStartTime(currTsFile, deviceId, 
chunkMetadata.getStartTime());
+          resource.updateEndTime(currTsFile, deviceId, 
chunkMetadata.getEndTime());
+        }
+
+        // move both cursors to the next
+        i++;
+        measurementChunkMetadataListIterator.next();
+      } else if 
(currMergingPaths.get(i).getMeasurement().compareTo(nextFileMeasurement.left) < 
0) {
+        // current series cannot be found in the file, move to the next 
merging series
+        i++;
+      } else {
+        // check the next series in the file
+        measurementChunkMetadataListIterator.next();
+      }
+
+      if (measurementChunkMetadataListIterator.hasNext()) {
+        nextFileMeasurement = 
measurementChunkMetadataListIterator.previewNext();
+      } else {
+        noMoreMeasurement = true;
       }
-      ret.add(i);
+
+      if (i >= currMergingPaths.size()) {
+        noMoreMeasurement = true;
+      }
+    }
+
+    if (Thread.interrupted()) {
+      Thread.currentThread().interrupt();
+      return;
     }
-    return ret;
   }
 
   @SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity 
warning
@@ -355,8 +328,9 @@ public class MergeMultiChunkTask {
     int[] ptWrittens = new int[seqChunkMeta.length];
     int mergeChunkSubTaskNum =
         IoTDBDescriptor.getInstance().getConfig().getMergeChunkSubThreadNum();
-    MetaListEntry[] metaListEntries = new 
MetaListEntry[currMergingPaths.size()];
-    PriorityQueue<Integer>[] chunkIdxHeaps = new 
PriorityQueue[mergeChunkSubTaskNum];
+    MetaListEntry[] seqMetaListEntries = new 
MetaListEntry[currMergingPaths.size()];
+    // split merge task into sub-tasks
+    PriorityQueue<Integer>[] pathIdxHeaps = new 
PriorityQueue[mergeChunkSubTaskNum];
 
     // if merge path is smaller than mergeChunkSubTaskNum, will use merge path 
number.
     // so thread are not wasted.
@@ -365,18 +339,20 @@ public class MergeMultiChunkTask {
     }
 
     for (int i = 0; i < mergeChunkSubTaskNum; i++) {
-      chunkIdxHeaps[i] = new PriorityQueue<>();
+      pathIdxHeaps[i] = new PriorityQueue<>();
     }
     int idx = 0;
     for (int i = 0; i < currMergingPaths.size(); i++) {
-      chunkIdxHeaps[idx % mergeChunkSubTaskNum].add(i);
+      // assign paths to sub-tasks in a round-robin way
+      pathIdxHeaps[idx % mergeChunkSubTaskNum].add(i);
       if (seqChunkMeta[i] == null || seqChunkMeta[i].isEmpty()) {
         continue;
       }
 
       MetaListEntry entry = new MetaListEntry(i, seqChunkMeta[i]);
+      // move to the first chunk metadata
       entry.next();
-      metaListEntries[i] = entry;
+      seqMetaListEntries[i] = entry;
       idx++;
       ptWrittens[i] = 0;
     }
@@ -390,8 +366,8 @@ public class MergeMultiChunkTask {
           MergeManager.getINSTANCE()
               .submitChunkSubTask(
                   new MergeChunkHeapTask(
-                      chunkIdxHeaps[i],
-                      metaListEntries,
+                      pathIdxHeaps[i],
+                      seqMetaListEntries,
                       ptWrittens,
                       reader,
                       mergeFileWriter,
@@ -531,12 +507,12 @@ public class MergeMultiChunkTask {
       IChunkWriter chunkWriter, IPointReader unseqReader, long timeLimit, int 
pathIdx)
       throws IOException {
     int ptWritten = 0;
-    while (currTimeValuePairs[pathIdx] != null
-        && currTimeValuePairs[pathIdx].getTimestamp() < timeLimit) {
-      writeTVPair(currTimeValuePairs[pathIdx], chunkWriter);
+    while (currUnseqTimeValuePairs[pathIdx] != null
+        && currUnseqTimeValuePairs[pathIdx].getTimestamp() < timeLimit) {
+      writeTVPair(currUnseqTimeValuePairs[pathIdx], chunkWriter);
       ptWritten++;
       unseqReader.nextTimeValuePair();
-      currTimeValuePairs[pathIdx] =
+      currUnseqTimeValuePairs[pathIdx] =
           unseqReader.hasNextTimeValuePair() ? 
unseqReader.currentTimeValuePair() : null;
     }
     return ptWritten;
@@ -568,14 +544,14 @@ public class MergeMultiChunkTask {
       // merge data in batch and data in unseqReader
       boolean overwriteSeqPoint = false;
       // unseq point.time <= sequence point.time, write unseq point
-      while (currTimeValuePairs[pathIdx] != null
-          && currTimeValuePairs[pathIdx].getTimestamp() <= time) {
-        writeTVPair(currTimeValuePairs[pathIdx], chunkWriter);
-        if (currTimeValuePairs[pathIdx].getTimestamp() == time) {
+      while (currUnseqTimeValuePairs[pathIdx] != null
+          && currUnseqTimeValuePairs[pathIdx].getTimestamp() <= time) {
+        writeTVPair(currUnseqTimeValuePairs[pathIdx], chunkWriter);
+        if (currUnseqTimeValuePairs[pathIdx].getTimestamp() == time) {
           overwriteSeqPoint = true;
         }
         unseqReader.nextTimeValuePair();
-        currTimeValuePairs[pathIdx] =
+        currUnseqTimeValuePairs[pathIdx] =
             unseqReader.hasNextTimeValuePair() ? 
unseqReader.currentTimeValuePair() : null;
         cnt++;
       }
@@ -590,21 +566,21 @@ public class MergeMultiChunkTask {
 
   public class MergeChunkHeapTask implements Callable<Void> {
 
-    private PriorityQueue<Integer> chunkIdxHeap;
+    private PriorityQueue<Integer> pathIdxHeap;
     private MetaListEntry[] metaListEntries;
     private int[] ptWrittens;
     private TsFileSequenceReader reader;
     private RestorableTsFileIOWriter mergeFileWriter;
     private IPointReader[] unseqReaders;
     private TsFileResource currFile;
-    private boolean isLastFile;
     private int taskNum;
     private long endTimeOfCurrentResource;
+    private boolean isLastFile;
 
     private int totalSeriesNum;
 
     public MergeChunkHeapTask(
-        PriorityQueue<Integer> chunkIdxHeap,
+        PriorityQueue<Integer> pathIdxHeap,
         MetaListEntry[] metaListEntries,
         int[] ptWrittens,
         TsFileSequenceReader reader,
@@ -614,16 +590,15 @@ public class MergeMultiChunkTask {
         boolean isLastFile,
         long endTimeOfCurrentResource,
         int taskNum) {
-      this.chunkIdxHeap = chunkIdxHeap;
+      this.pathIdxHeap = pathIdxHeap;
       this.metaListEntries = metaListEntries;
       this.ptWrittens = ptWrittens;
       this.reader = reader;
       this.mergeFileWriter = mergeFileWriter;
       this.unseqReaders = unseqReaders;
       this.currFile = currFile;
-      this.isLastFile = isLastFile;
       this.taskNum = taskNum;
-      this.totalSeriesNum = chunkIdxHeap.size();
+      this.totalSeriesNum = pathIdxHeap.size();
       this.endTimeOfCurrentResource = endTimeOfCurrentResource;
     }
 
@@ -635,8 +610,8 @@ public class MergeMultiChunkTask {
 
     @SuppressWarnings("java:S2445") // avoid reading the same reader 
concurrently
     private void mergeChunkHeap() throws IOException, MetadataException {
-      while (!chunkIdxHeap.isEmpty()) {
-        int pathIdx = chunkIdxHeap.poll();
+      while (!pathIdxHeap.isEmpty()) {
+        int pathIdx = pathIdxHeap.poll();
         PartialPath path = currMergingPaths.get(pathIdx);
         MeasurementSchema measurementSchema = 
IoTDB.metaManager.getSeriesSchema(path);
         IChunkWriter chunkWriter = resource.getChunkWriter(measurementSchema);
@@ -646,12 +621,16 @@ public class MergeMultiChunkTask {
         }
 
         if (metaListEntries[pathIdx] != null) {
+          // the seq file has this series, read a chunk and merge it with 
unseq data
           MetaListEntry metaListEntry = metaListEntries[pathIdx];
           ChunkMetadata currMeta = metaListEntry.current();
           boolean isLastChunk = !metaListEntry.hasNext();
           boolean chunkOverflowed =
               MergeUtils.isChunkOverflowed(
-                  currTimeValuePairs[pathIdx], currMeta, isLastChunk, 
endTimeOfCurrentResource);
+                  currUnseqTimeValuePairs[pathIdx],
+                  currMeta,
+                  isLastChunk,
+                  endTimeOfCurrentResource);
           boolean chunkTooSmall =
               MergeUtils.isChunkTooSmall(
                   ptWrittens[pathIdx], currMeta, isLastChunk, 
minChunkPointNum);
@@ -676,16 +655,22 @@ public class MergeMultiChunkTask {
                   currFile);
 
           if (!isLastChunk) {
+            // put the series back so its remaining chunks can be merged
             metaListEntry.next();
-            chunkIdxHeap.add(pathIdx);
+            pathIdxHeap.add(pathIdx);
             continue;
           }
         }
-        // this only happens when the seqFiles do not contain this series, 
otherwise the remaining
+        // the seq file does not have the series, otherwise the remaining
         // data will be merged with the last chunk in the seqFiles
-        if (isLastFile && currTimeValuePairs[pathIdx] != null) {
+        // writing device-level overlapped data into this file
+        if (currUnseqTimeValuePairs[pathIdx] != null) {
           ptWrittens[pathIdx] +=
-              writeRemainingUnseq(chunkWriter, unseqReaders[pathIdx], 
Long.MAX_VALUE, pathIdx);
+              writeRemainingUnseq(
+                  chunkWriter,
+                  unseqReaders[pathIdx],
+                  isLastFile ? Long.MAX_VALUE : endTimeOfCurrentResource + 1,
+                  pathIdx);
           mergedChunkNum.incrementAndGet();
         }
         // the last merged chunk may still be smaller than the threshold, 
flush it anyway
@@ -707,7 +692,7 @@ public class MergeMultiChunkTask {
 
     public String getProgress() {
       return String.format(
-          "Processed %d/%d series", totalSeriesNum - chunkIdxHeap.size(), 
totalSeriesNum);
+          "Processed %d/%d series", totalSeriesNum - pathIdxHeap.size(), 
totalSeriesNum);
     }
   }
 }
diff --git 
a/server/src/test/java/org/apache/iotdb/db/engine/merge/MergeNewSeriesTest.java 
b/server/src/test/java/org/apache/iotdb/db/engine/merge/MergeNewSeriesTest.java
new file mode 100644
index 0000000000..4b1e8942a7
--- /dev/null
+++ 
b/server/src/test/java/org/apache/iotdb/db/engine/merge/MergeNewSeriesTest.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.engine.merge;
+
+import org.apache.iotdb.db.constant.TestConstant;
+import org.apache.iotdb.db.engine.merge.manage.MergeResource;
+import org.apache.iotdb.db.engine.merge.task.MergeTask;
+import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.db.exception.metadata.MetadataException;
+import org.apache.iotdb.tsfile.exception.write.WriteProcessException;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * In the test, the unseq file has a new series that has never been written to 
seq files. However,
+ * other timeseries of its belonging device have already been written to the 
seq files. As a result,
+ * the unseq file and seq file are device-level overlapped but not 
series-level overlapped, and the
+ * new series in the unseq file should be written into seq files according to 
their device time
+ * range.
+ */
+public class MergeNewSeriesTest extends MergeTest {
+
+  private MeasurementSchema[] unseqSchemas = new MeasurementSchema[1];
+
+  @Override
+  public void setUp() throws IOException, WriteProcessException, 
MetadataException {
+    measurementNum = 1;
+    deviceNum = 2;
+    seqFileNum = 2;
+    // unseq files are manually prepared because they will have new time series
+    unseqFileNum = 0;
+    super.setUp();
+  }
+
+  @Override
+  protected void prepareSeries() throws MetadataException {
+    super.prepareSeries();
+    unseqSchemas[0] = toMeasurementSchema(1);
+    createTimeseries(deviceIds, unseqSchemas);
+  }
+
+  @Override
+  void prepareFiles(int seqFileNum, int unseqFileNum) throws IOException, 
WriteProcessException {
+    // seq file1: root.MergeTest.d0.s0[0,99] root.MergeTest.d1.s0[0,99]
+    // seq file2: root.MergeTest.d0.s0[100,199] root.MergeTest.d1.s0[100,199]
+    super.prepareFiles(seqFileNum, unseqFileNum);
+    // unseq file1: root.MergeTest.d0.s1[0,99]
+    // unseq file2: root.MergeTest.d1.s1[100,199]
+    // unseq1 should be written into seq1 while unseq2 should be written into 
seq2
+    TsFileResource unseq1 = prepareResource(2);
+    unseqResources.add(unseq1);
+    TsFileResource unseq2 = prepareResource(3);
+    unseqResources.add(unseq2);
+
+    prepareFile(unseq1, 0, ptNum, 0, Arrays.copyOfRange(deviceIds, 0, 1), 
unseqSchemas);
+    prepareFile(unseq2, ptNum, ptNum, 0, Arrays.copyOfRange(deviceIds, 1, 2), 
unseqSchemas);
+  }
+
+  @Test
+  public void testFullMerge() throws Exception {
+    MergeTask mergeTask =
+        new MergeTask(
+            new MergeResource(seqResources, unseqResources),
+            TestConstant.OUTPUT_DATA_DIR,
+            (k, v, l) -> {},
+            "test",
+            true,
+            1,
+            MERGE_TEST_SG);
+    mergeTask.call();
+
+    // query root.MergeTest.d0.s1 from seq1
+    List<TsFileResource> resources = 
Collections.singletonList(seqResources.get(0));
+    queryAndCheck(
+        deviceIds[0], unseqSchemas[0], resources, checkResultFunc(0), 
checkResultCntFunc(ptNum));
+
+    // query root.MergeTest.d1.s1 from seq2
+    resources = Collections.singletonList(seqResources.get(1));
+    queryAndCheck(
+        deviceIds[1], unseqSchemas[0], resources, checkResultFunc(0), 
checkResultCntFunc(ptNum));
+
+    // query root.MergeTest.d0.s1 from seq2
+    resources = Collections.singletonList(seqResources.get(1));
+    queryAndCheck(
+        deviceIds[0], unseqSchemas[0], resources, checkResultFunc(0), 
checkResultCntFunc(0));
+
+    // query root.MergeTest.d1.s1 from seq1
+    resources = Collections.singletonList(seqResources.get(0));
+    queryAndCheck(
+        deviceIds[1], unseqSchemas[0], resources, checkResultFunc(0), 
checkResultCntFunc(0));
+  }
+}
diff --git 
a/server/src/test/java/org/apache/iotdb/db/engine/merge/MergeOverLapTest.java 
b/server/src/test/java/org/apache/iotdb/db/engine/merge/MergeOverLapTest.java
index 003dafe2e2..2cc290549e 100644
--- 
a/server/src/test/java/org/apache/iotdb/db/engine/merge/MergeOverLapTest.java
+++ 
b/server/src/test/java/org/apache/iotdb/db/engine/merge/MergeOverLapTest.java
@@ -20,7 +20,6 @@
 
 package org.apache.iotdb.db.engine.merge;
 
-import org.apache.iotdb.db.conf.IoTDBConstant;
 import org.apache.iotdb.db.constant.TestConstant;
 import org.apache.iotdb.db.engine.merge.manage.MergeResource;
 import org.apache.iotdb.db.engine.merge.task.MergeTask;
@@ -75,66 +74,7 @@ public class MergeOverLapTest extends MergeTest {
     FileUtils.deleteDirectory(tempSGDir);
   }
 
-  @Override
-  void prepareFiles(int seqFileNum, int unseqFileNum) throws IOException, 
WriteProcessException {
-    for (int i = 0; i < seqFileNum; i++) {
-      File file =
-          new File(
-              TestConstant.OUTPUT_DATA_DIR.concat(
-                  i
-                      + IoTDBConstant.FILE_NAME_SEPARATOR
-                      + i
-                      + IoTDBConstant.FILE_NAME_SEPARATOR
-                      + 0
-                      + IoTDBConstant.FILE_NAME_SEPARATOR
-                      + 0
-                      + ".tsfile"));
-      TsFileResource tsFileResource = new TsFileResource(file);
-      tsFileResource.setClosed(true);
-      tsFileResource.setMinPlanIndex(i);
-      tsFileResource.setMaxPlanIndex(i);
-      seqResources.add(tsFileResource);
-      prepareFile(tsFileResource, i * ptNum, ptNum, 0);
-    }
-    for (int i = 0; i < unseqFileNum; i++) {
-      File file =
-          new File(
-              TestConstant.OUTPUT_DATA_DIR.concat(
-                  (10000 + i)
-                      + IoTDBConstant.FILE_NAME_SEPARATOR
-                      + (10000 + i)
-                      + IoTDBConstant.FILE_NAME_SEPARATOR
-                      + 0
-                      + IoTDBConstant.FILE_NAME_SEPARATOR
-                      + 0
-                      + ".tsfile"));
-      TsFileResource tsFileResource = new TsFileResource(file);
-      tsFileResource.setClosed(true);
-      tsFileResource.setMaxPlanIndex(i + seqFileNum);
-      tsFileResource.setMinPlanIndex(i + seqFileNum);
-      unseqResources.add(tsFileResource);
-      prepareUnseqFile(tsFileResource, i * ptNum, ptNum * (i + 1) / 
unseqFileNum, 10000);
-    }
-    File file =
-        new File(
-            TestConstant.OUTPUT_DATA_DIR.concat(
-                unseqFileNum
-                    + IoTDBConstant.FILE_NAME_SEPARATOR
-                    + unseqFileNum
-                    + IoTDBConstant.FILE_NAME_SEPARATOR
-                    + 0
-                    + IoTDBConstant.FILE_NAME_SEPARATOR
-                    + 0
-                    + ".tsfile"));
-    TsFileResource tsFileResource = new TsFileResource(file);
-    tsFileResource.setClosed(true);
-    tsFileResource.setMinPlanIndex(seqFileNum + unseqFileNum);
-    tsFileResource.setMaxPlanIndex(seqFileNum + unseqFileNum);
-    unseqResources.add(tsFileResource);
-    prepareUnseqFile(tsFileResource, 0, ptNum * unseqFileNum, 20000);
-  }
-
-  private void prepareUnseqFile(
+  protected void prepareUnseqFile(
       TsFileResource tsFileResource, long timeOffset, long ptNum, long 
valueOffset)
       throws IOException, WriteProcessException {
     TsFileWriter fileWriter = new TsFileWriter(tsFileResource.getTsFile());
diff --git 
a/server/src/test/java/org/apache/iotdb/db/engine/merge/MergeTaskTest.java 
b/server/src/test/java/org/apache/iotdb/db/engine/merge/MergeTaskTest.java
index 6f058c4c9c..6c3bd4e1c6 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/merge/MergeTaskTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/merge/MergeTaskTest.java
@@ -125,14 +125,14 @@ public class MergeTaskTest extends MergeTest {
 
   @Test
   public void testMergeEndTime() throws Exception {
-    List<TsFileResource> testSeqResources = seqResources.subList(0, 3);
+    List<TsFileResource> testSeqResources = seqResources.subList(0, 5);
     List<TsFileResource> testUnseqResource = unseqResources.subList(5, 6);
     MergeTask mergeTask =
         new MergeTask(
             new MergeResource(testSeqResources, testUnseqResource),
             tempSGDir.getPath(),
             (k, v, l) -> {
-              assertEquals(499, k.get(2).getEndTime("root.mergeTest.device1"));
+              assertEquals(499, k.get(4).getEndTime("root.mergeTest.device1"));
             },
             "test",
             false,
@@ -646,7 +646,7 @@ public class MergeTaskTest extends MergeTest {
             tempSGDir.getPath(),
             (k, v, l) -> {
               try (TsFileSequenceReader reader =
-                  new TsFileSequenceReader(k.get(2).getTsFilePath())) {
+                  new TsFileSequenceReader(k.get(0).getTsFilePath())) {
                 List<ChunkMetadata> chunkMetadataList =
                     reader.getChunkMetadataList(
                         new PartialPath(deviceIds[0], 
measurementSchemas[2].getMeasurementId()));
diff --git 
a/server/src/test/java/org/apache/iotdb/db/engine/merge/MergeTest.java 
b/server/src/test/java/org/apache/iotdb/db/engine/merge/MergeTest.java
index 4633178749..10a0842942 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/merge/MergeTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/merge/MergeTest.java
@@ -27,16 +27,21 @@ import 
org.apache.iotdb.db.engine.cache.TimeSeriesMetadataCache;
 import org.apache.iotdb.db.engine.merge.manage.MergeManager;
 import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
 import org.apache.iotdb.db.exception.StorageEngineException;
+import org.apache.iotdb.db.exception.metadata.IllegalPathException;
 import org.apache.iotdb.db.exception.metadata.MetadataException;
 import org.apache.iotdb.db.metadata.PartialPath;
+import org.apache.iotdb.db.query.context.QueryContext;
 import org.apache.iotdb.db.query.control.FileReaderManager;
+import org.apache.iotdb.db.query.reader.series.SeriesRawDataBatchReader;
 import org.apache.iotdb.db.service.IoTDB;
 import org.apache.iotdb.db.utils.EnvironmentUtils;
 import org.apache.iotdb.tsfile.exception.write.WriteProcessException;
 import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.iotdb.tsfile.read.common.BatchData;
 import org.apache.iotdb.tsfile.read.common.Path;
+import org.apache.iotdb.tsfile.read.reader.IBatchReader;
 import org.apache.iotdb.tsfile.write.TsFileWriter;
 import org.apache.iotdb.tsfile.write.record.TSRecord;
 import org.apache.iotdb.tsfile.write.record.datapoint.DataPoint;
@@ -53,6 +58,7 @@ import java.util.Collections;
 import java.util.List;
 
 import static org.apache.iotdb.db.conf.IoTDBConstant.PATH_SEPARATOR;
+import static org.junit.Assert.assertEquals;
 
 abstract class MergeTest {
 
@@ -62,7 +68,9 @@ abstract class MergeTest {
   int unseqFileNum = 5;
   int measurementNum = 10;
   int deviceNum = 10;
+  // points of each series in a file
   long ptNum = 100;
+  // flush each time "flushInterval" points of all series are written
   long flushInterval = 20;
   TSEncoding encoding = TSEncoding.PLAIN;
 
@@ -100,18 +108,30 @@ abstract class MergeTest {
     MergeManager.getINSTANCE().stop();
   }
 
+  protected MeasurementSchema toMeasurementSchema(int measurementIndex) {
+    return new MeasurementSchema(
+        "sensor" + measurementIndex, TSDataType.DOUBLE, encoding, 
CompressionType.UNCOMPRESSED);
+  }
+
+  protected String toDeviceId(int deviceIndex) {
+    return MERGE_TEST_SG + PATH_SEPARATOR + "device" + deviceIndex;
+  }
+
   protected void prepareSeries() throws MetadataException {
     measurementSchemas = new MeasurementSchema[measurementNum];
     for (int i = 0; i < measurementNum; i++) {
-      measurementSchemas[i] =
-          new MeasurementSchema(
-              "sensor" + i, TSDataType.DOUBLE, encoding, 
CompressionType.UNCOMPRESSED);
+      measurementSchemas[i] = toMeasurementSchema(i);
     }
     deviceIds = new String[deviceNum];
     for (int i = 0; i < deviceNum; i++) {
-      deviceIds[i] = MERGE_TEST_SG + PATH_SEPARATOR + "device" + i;
+      deviceIds[i] = toDeviceId(i);
     }
     IoTDB.metaManager.setStorageGroup(new PartialPath(MERGE_TEST_SG));
+    createTimeseries(deviceIds, measurementSchemas);
+  }
+
+  protected void createTimeseries(String[] deviceIds, MeasurementSchema[] 
measurementSchemas)
+      throws MetadataException {
     for (String device : deviceIds) {
       for (MeasurementSchema measurementSchema : measurementSchemas) {
         PartialPath devicePath = new PartialPath(device);
@@ -125,66 +145,60 @@ abstract class MergeTest {
     }
   }
 
+  protected String toFileName(int fileIndex) {
+    return TestConstant.OUTPUT_DATA_DIR.concat(
+        fileIndex
+            + IoTDBConstant.FILE_NAME_SEPARATOR
+            + fileIndex
+            + IoTDBConstant.FILE_NAME_SEPARATOR
+            + 0
+            + IoTDBConstant.FILE_NAME_SEPARATOR
+            + 0
+            + ".tsfile");
+  }
+
+  protected TsFileResource prepareResource(int fileIndex) {
+    File file = new File(toFileName(fileIndex));
+    TsFileResource tsFileResource = new TsFileResource(file);
+    tsFileResource.setClosed(true);
+    tsFileResource.setMinPlanIndex(fileIndex);
+    tsFileResource.setMaxPlanIndex(fileIndex);
+    tsFileResource.setVersion(fileIndex);
+    return tsFileResource;
+  }
+
+  protected void prepareSeqFile(
+      TsFileResource tsFileResource, long timeOffset, long ptNum, long 
valueOffset)
+      throws IOException, WriteProcessException {
+    prepareFile(tsFileResource, timeOffset, ptNum, valueOffset);
+  }
+
+  protected void prepareUnseqFile(
+      TsFileResource tsFileResource, long timeOffset, long ptNum, long 
valueOffset)
+      throws IOException, WriteProcessException {
+    prepareFile(tsFileResource, timeOffset, ptNum, valueOffset);
+  }
+
   void prepareFiles(int seqFileNum, int unseqFileNum) throws IOException, 
WriteProcessException {
+    // seq files, each has "ptNum" points of all series
     for (int i = 0; i < seqFileNum; i++) {
-      File file =
-          new File(
-              TestConstant.OUTPUT_DATA_DIR.concat(
-                  i
-                      + IoTDBConstant.FILE_NAME_SEPARATOR
-                      + i
-                      + IoTDBConstant.FILE_NAME_SEPARATOR
-                      + 0
-                      + IoTDBConstant.FILE_NAME_SEPARATOR
-                      + 0
-                      + ".tsfile"));
-      TsFileResource tsFileResource = new TsFileResource(file);
-      tsFileResource.setClosed(true);
-      tsFileResource.setMinPlanIndex(i);
-      tsFileResource.setMaxPlanIndex(i);
-      tsFileResource.setVersion(i);
+      TsFileResource tsFileResource = prepareResource(i);
       seqResources.add(tsFileResource);
-      prepareFile(tsFileResource, i * ptNum, ptNum, 0);
+      prepareSeqFile(tsFileResource, i * ptNum, ptNum, 0);
     }
+    // unseq files, each overlaps ONE seq file and latter ones has more points
     for (int i = 0; i < unseqFileNum; i++) {
-      File file =
-          new File(
-              TestConstant.OUTPUT_DATA_DIR.concat(
-                  (10000 + i)
-                      + IoTDBConstant.FILE_NAME_SEPARATOR
-                      + (10000 + i)
-                      + IoTDBConstant.FILE_NAME_SEPARATOR
-                      + 0
-                      + IoTDBConstant.FILE_NAME_SEPARATOR
-                      + 0
-                      + ".tsfile"));
-      TsFileResource tsFileResource = new TsFileResource(file);
-      tsFileResource.setClosed(true);
-      tsFileResource.setMinPlanIndex(i + seqFileNum);
-      tsFileResource.setMaxPlanIndex(i + seqFileNum);
-      tsFileResource.setVersion(i + seqFileNum);
+      TsFileResource tsFileResource = prepareResource(seqFileNum + i);
       unseqResources.add(tsFileResource);
-      prepareFile(tsFileResource, i * ptNum, ptNum * (i + 1) / unseqFileNum, 
10000);
+      prepareUnseqFile(tsFileResource, i * ptNum, ptNum * (i + 1) / 
unseqFileNum, 10000);
     }
 
-    File file =
-        new File(
-            TestConstant.OUTPUT_DATA_DIR.concat(
-                unseqFileNum
-                    + IoTDBConstant.FILE_NAME_SEPARATOR
-                    + unseqFileNum
-                    + IoTDBConstant.FILE_NAME_SEPARATOR
-                    + 0
-                    + IoTDBConstant.FILE_NAME_SEPARATOR
-                    + 0
-                    + ".tsfile"));
-    TsFileResource tsFileResource = new TsFileResource(file);
-    tsFileResource.setClosed(true);
-    tsFileResource.setMinPlanIndex(seqFileNum + unseqFileNum);
-    tsFileResource.setMaxPlanIndex(seqFileNum + unseqFileNum);
-    tsFileResource.setVersion(seqFileNum + unseqFileNum);
-    unseqResources.add(tsFileResource);
-    prepareFile(tsFileResource, 0, ptNum * unseqFileNum, 20000);
+    if (unseqFileNum > 0) {
+      // an additional unseq file that overlaps ALL seq files
+      TsFileResource tsFileResource = prepareResource(seqFileNum + 
unseqFileNum);
+      unseqResources.add(tsFileResource);
+      prepareUnseqFile(tsFileResource, 0, ptNum * unseqFileNum, 20000);
+    }
   }
 
   void removeFiles(List<TsFileResource> seqResList, List<TsFileResource> 
unseqResList)
@@ -203,6 +217,17 @@ abstract class MergeTest {
 
   void prepareFile(TsFileResource tsFileResource, long timeOffset, long ptNum, 
long valueOffset)
       throws IOException, WriteProcessException {
+    prepareFile(tsFileResource, timeOffset, ptNum, valueOffset, deviceIds, 
measurementSchemas);
+  }
+
+  void prepareFile(
+      TsFileResource tsFileResource,
+      long timeOffset,
+      long ptNum,
+      long valueOffset,
+      String[] deviceIds,
+      MeasurementSchema[] measurementSchemas)
+      throws IOException, WriteProcessException {
     File tsfile = tsFileResource.getTsFile();
     if (!tsfile.getParentFile().exists()) {
       Assert.assertTrue(tsfile.getParentFile().mkdirs());
@@ -215,18 +240,18 @@ abstract class MergeTest {
       }
     }
     for (long i = timeOffset; i < timeOffset + ptNum; i++) {
-      for (int j = 0; j < deviceNum; j++) {
-        TSRecord record = new TSRecord(i, deviceIds[j]);
-        for (int k = 0; k < measurementNum; k++) {
+      for (String deviceId : deviceIds) {
+        TSRecord record = new TSRecord(i, deviceId);
+        for (MeasurementSchema measurementSchema : measurementSchemas) {
           record.addTuple(
               DataPoint.getDataPoint(
-                  measurementSchemas[k].getType(),
-                  measurementSchemas[k].getMeasurementId(),
+                  measurementSchema.getType(),
+                  measurementSchema.getMeasurementId(),
                   String.valueOf(i + valueOffset)));
         }
         fileWriter.write(record);
-        tsFileResource.updateStartTime(deviceIds[j], i);
-        tsFileResource.updateEndTime(deviceIds[j], i);
+        tsFileResource.updateStartTime(deviceId, i);
+        tsFileResource.updateEndTime(deviceId, i);
       }
       if ((i + 1) % flushInterval == 0) {
         fileWriter.flushAllChunkGroups();
@@ -234,4 +259,69 @@ abstract class MergeTest {
     }
     fileWriter.close();
   }
+
+  @FunctionalInterface
+  protected interface QueryResultChecker {
+    void check(long time, Object value, int pointIndex);
+  }
+
+  @FunctionalInterface
+  protected interface QueryCntChecker {
+    void check(int cnt);
+  }
+
+  protected static QueryResultChecker checkResultFunc(long valueOffset) {
+    return (time, value, index) -> assertEquals(time + valueOffset, (double) 
value, 0.001);
+  }
+
+  protected static QueryCntChecker checkResultCntFunc(long expected) {
+    return cnt -> assertEquals(expected, cnt);
+  }
+
+  /**
+   * Query all points of a timeseries and check point by point.
+   *
+   * @param deviceId of the queried timeseries
+   * @param measurementSchema of the queried timeseries
+   * @param resources files to be queried
+   * @param resultChecker how should each point be examined
+   * @param cntChecker how should the final count be examined
+   * @throws IOException
+   * @throws IllegalPathException
+   */
+  protected static void queryAndCheck(
+      String deviceId,
+      MeasurementSchema measurementSchema,
+      List<TsFileResource> resources,
+      QueryResultChecker resultChecker,
+      QueryCntChecker cntChecker)
+      throws IOException, IllegalPathException {
+    QueryContext context = new QueryContext();
+    PartialPath path = new PartialPath(deviceId, 
measurementSchema.getMeasurementId());
+    IBatchReader tsFilesReader =
+        new SeriesRawDataBatchReader(
+            path,
+            measurementSchema.getType(),
+            context,
+            resources,
+            new ArrayList<>(),
+            null,
+            null,
+            true);
+    int cnt = 0;
+    try {
+      while (tsFilesReader.hasNextBatch()) {
+        BatchData batchData = tsFilesReader.nextBatch();
+        for (int i = 0; i < batchData.length(); i++) {
+          long time = batchData.getTimeByIndex(i);
+          Object value = batchData.getDoubleByIndex(i);
+          resultChecker.check(time, value, i);
+          cnt++;
+        }
+      }
+      cntChecker.check(cnt);
+    } finally {
+      tsFilesReader.close();
+    }
+  }
 }
diff --git 
a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java 
b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java
index 4d92146d23..cc4116162f 100644
--- 
a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java
+++ 
b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java
@@ -47,6 +47,7 @@ import org.apache.iotdb.tsfile.read.reader.TsFileInput;
 import org.apache.iotdb.tsfile.read.reader.page.PageReader;
 import org.apache.iotdb.tsfile.utils.BloomFilter;
 import org.apache.iotdb.tsfile.utils.Pair;
+import org.apache.iotdb.tsfile.utils.PreviewIterator;
 import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
 
@@ -1482,6 +1483,88 @@ public class TsFileSequenceReader implements 
AutoCloseable {
     };
   }
 
+  public class DeviceChunkMetaListIterator
+      implements PreviewIterator<Pair<String, List<ChunkMetadata>>> {
+
+    private String device;
+    Queue<Pair<Long, Long>> leafMeasurementNodePositions;
+    Queue<Pair<String, List<ChunkMetadata>>> currentEntryQueue;
+
+    public DeviceChunkMetaListIterator(String device) throws IOException {
+      this.device = device;
+      init();
+    }
+
+    private void init() throws IOException {
+      readFileMetadata();
+
+      MetadataIndexNode metadataIndexNode = tsFileMetaData.getMetadataIndex();
+      Pair<MetadataIndexEntry, Long> metadataIndexPair =
+          getMetadataAndEndOffset(metadataIndexNode, device, true, true);
+
+      if (metadataIndexPair == null) {
+        return;
+      }
+
+      leafMeasurementNodePositions = new LinkedList<>();
+      ByteBuffer buffer = readData(metadataIndexPair.left.getOffset(), 
metadataIndexPair.right);
+      collectEachLeafMeasurementNodeOffsetRange(buffer, 
leafMeasurementNodePositions);
+      currentEntryQueue = new LinkedList<>();
+    }
+
+    @Override
+    public boolean hasNext() {
+      if (currentEntryQueue != null && !currentEntryQueue.isEmpty()) {
+        return true;
+      }
+      if (leafMeasurementNodePositions == null || 
leafMeasurementNodePositions.isEmpty()) {
+        return false;
+      }
+
+      Pair<Long, Long> startEndPair = leafMeasurementNodePositions.remove();
+      try {
+        ByteBuffer nextBuffer = readData(startEndPair.left, 
startEndPair.right);
+        while (nextBuffer.hasRemaining()) {
+          TimeseriesMetadata timeseriesMetadata =
+              TimeseriesMetadata.deserializeFrom(nextBuffer, true);
+          currentEntryQueue.add(
+              new Pair<>(
+                  timeseriesMetadata.getMeasurementId(),
+                  timeseriesMetadata.getChunkMetadataList()));
+        }
+      } catch (IOException e) {
+        throw new TsFileRuntimeException(
+            "Error occurred while reading a time series metadata block.");
+      }
+
+      return !currentEntryQueue.isEmpty();
+    }
+
+    public Pair<String, List<ChunkMetadata>> next() {
+      if (!hasNext()) {
+        throw new NoSuchElementException();
+      }
+      return currentEntryQueue.poll();
+    }
+
+    @Override
+    public Pair<String, List<ChunkMetadata>> previewNext() {
+      if (!hasNext()) {
+        throw new NoSuchElementException();
+      }
+      return currentEntryQueue.peek();
+    }
+  }
+
+  /**
+   * @return An iterator of entry ( measurement -> chunk metadata list ). When 
traversing it, you
+   *     will get chunk metadata lists according to the lexicographic order of 
the measurements.
+   */
+  public PreviewIterator<Pair<String, List<ChunkMetadata>>> 
getMeasurementChunkMetadataListIterator(
+      String device) throws IOException {
+    return new DeviceChunkMetaListIterator(device);
+  }
+
   private void collectEachLeafMeasurementNodeOffsetRange(
       ByteBuffer buffer, Queue<Pair<Long, Long>> queue) throws IOException {
     try {
diff --git 
a/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/PreviewIterator.java 
b/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/PreviewIterator.java
new file mode 100644
index 0000000000..96c4a2c2a6
--- /dev/null
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/PreviewIterator.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.tsfile.utils;
+
+import java.util.Iterator;
+
+/** A PreviewIterator allows to see what the next value will be without moving 
the cursor. */
+public interface PreviewIterator<T> extends Iterator<T> {
+  T previewNext();
+}

Reply via email to