This is an automated email from the ASF dual-hosted git repository.

ericpai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new c30bb7c00d [IOTDB-3296] ext-pipe suport .mods file (#6102)
c30bb7c00d is described below

commit c30bb7c00de32bc8caf0e764c3702a52880cec59
Author: Jamber <[email protected]>
AuthorDate: Wed Aug 10 11:46:48 2022 +0800

    [IOTDB-3296] ext-pipe suport .mods file (#6102)
---
 .../iotdb/commons/concurrent/ThreadName.java       |   3 +-
 .../iotdb/db/sync/datasource/AbstractOpBlock.java  |  16 +-
 .../iotdb/db/sync/datasource/DeletionGroup.java    | 242 +++++++++++
 .../iotdb/db/sync/datasource/ModsfileOpBlock.java  |  53 ---
 .../iotdb/db/sync/datasource/PipeOpManager.java    |   6 +-
 .../iotdb/db/sync/datasource/TsFileOpBlock.java    | 463 +++++++++++++++++----
 .../iotdb/db/sync/externalpipe/ExtPipePlugin.java  |  15 +-
 .../db/sync/externalpipe/ExtPipePluginManager.java |   4 +-
 .../iotdb/db/sync/pipedata/TsFilePipeData.java     |  12 +-
 .../db/sync/datasource/DeletionGroupTest.java      | 231 ++++++++++
 .../db/sync/datasource/PipeOpManagerTest.java      | 226 +++++++++-
 .../db/sync/datasource/TsFileOpBlockTest.java      | 372 ++++++++++++++++-
 12 files changed, 1454 insertions(+), 189 deletions(-)

diff --git 
a/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
 
b/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
index 0e9af94a30..66f312fad8 100644
--- 
a/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
+++ 
b/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
@@ -82,7 +82,8 @@ public enum ThreadName {
   MPP_DATA_EXCHANGE_RPC_SERVER("MPPDataExchangeRPC"),
   MPP_DATA_EXCHANGE_RPC_CLIENT("MPPDataExchangeRPC-Client"),
   INTERNAL_SERVICE_RPC_SERVER("InternalServiceRPC"),
-  INTERNAL_SERVICE_RPC_CLIENT("InternalServiceRPC-Client");
+  INTERNAL_SERVICE_RPC_CLIENT("InternalServiceRPC-Client"),
+  EXT_PIPE_PLUGIN_WORKER("ExtPipePlugin-Worker");
 
   private final String name;
 
diff --git 
a/server/src/main/java/org/apache/iotdb/db/sync/datasource/AbstractOpBlock.java 
b/server/src/main/java/org/apache/iotdb/db/sync/datasource/AbstractOpBlock.java
index d4aa103ba5..4823ad5700 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/sync/datasource/AbstractOpBlock.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/sync/datasource/AbstractOpBlock.java
@@ -35,7 +35,7 @@ public abstract class AbstractOpBlock implements 
Comparable<AbstractOpBlock> {
   protected String storageGroup;
   long filePipeSerialNumber;
 
-  boolean closed = true;
+  boolean closed = false;
 
   // record First Entry's index
   protected long beginIndex = -1;
@@ -93,7 +93,7 @@ public abstract class AbstractOpBlock implements 
Comparable<AbstractOpBlock> {
 
   /** release current class' resource */
   public void close() {
-    closed = false;
+    closed = true;
   };
 
   public boolean isClosed() {
@@ -107,4 +107,16 @@ public abstract class AbstractOpBlock implements 
Comparable<AbstractOpBlock> {
   public void setFilePipeSerialNumber(long filePipeSerialNumber) {
     this.filePipeSerialNumber = filePipeSerialNumber;
   }
+
+  @Override
+  public String toString() {
+    return "storageGroup="
+        + storageGroup
+        + ", filePipeSerialNumber="
+        + filePipeSerialNumber
+        + ", beginIndex="
+        + beginIndex
+        + ", dataCount="
+        + dataCount;
+  }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/sync/datasource/DeletionGroup.java 
b/server/src/main/java/org/apache/iotdb/db/sync/datasource/DeletionGroup.java
new file mode 100644
index 0000000000..5058856627
--- /dev/null
+++ 
b/server/src/main/java/org/apache/iotdb/db/sync/datasource/DeletionGroup.java
@@ -0,0 +1,242 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.iotdb.db.sync.datasource;
+
+import org.apache.iotdb.commons.utils.TestOnly;
+
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Objects;
+import java.util.TreeMap;
+
+/**
+ * This class provides below functions
+ *
+ * <p>1) Save many deletion time-intervals.
+ *
+ * <p>2) Merge overlap intervals to 1 interval.
+ *
+ * <p>3) Check whether 1 time-range's data has been deleted according to saved 
deletion
+ * time-intervals.
+ *
+ * <p>4) Check whether 1 time-point's data has been deleted according to saved 
deletion
+ * time-intervals.
+ *
+ * <p>5) For time-ascending batch data, provide better-performance method to 
check whether 1
+ * time-point's data has been deleted.
+ */
+public class DeletionGroup {
+  // TreeMap: StartTime => EndTime
+  private TreeMap<Long, Long> delIntervalMap;
+
+  public enum DeletedType {
+    NO_DELETED, // Mo data has been deleted
+    PARTIAL_DELETED, // Partial data has been deleted
+    FULL_DELETED // All data has been deleted
+  }
+
+  public static class IntervalCursor {
+    Iterator<Map.Entry<Long, Long>> iter = null;
+    boolean subsequentNoDelete = false;
+    public long startTime;
+    public long endTime;
+
+    public void reset() {
+      iter = null;
+      subsequentNoDelete = false;
+    }
+  }
+
+  public DeletionGroup() {
+    delIntervalMap = new TreeMap<>();
+  }
+
+  /**
+   * Insert delete time interval data for every deletion.
+   *
+   * @param startTime
+   * @param endTime
+   */
+  public void addDelInterval(long startTime, long endTime) {
+    if (startTime > endTime) {
+      throw new IllegalArgumentException("addDelInterval(), error: startTime > 
endTime.");
+    }
+
+    // == pay attention, intervalMap's Entries are not overlap.
+    Map.Entry<Long, Long> startEntry = delIntervalMap.floorEntry(startTime);
+    Map.Entry<Long, Long> endEntry = delIntervalMap.floorEntry(endTime);
+
+    if ((startEntry != null) && (startTime <= startEntry.getValue())) {
+      startTime = startEntry.getKey();
+    }
+    if ((endEntry != null) && (endTime < endEntry.getValue())) {
+      endTime = endEntry.getValue();
+    }
+
+    // == find existing overlap entries and remove them
+    Map<Long, Long> overlapEntries = delIntervalMap.subMap(startTime, true, 
endTime, true);
+    Iterator<Map.Entry<Long, Long>> iter = 
overlapEntries.entrySet().iterator();
+    while (iter.hasNext()) {
+      iter.next();
+      iter.remove();
+    }
+
+    delIntervalMap.put(startTime, endTime); // add new deletion interval
+  }
+
+  /**
+   * If this object has no deletion data (i.e delIntervalMap is empty), return 
true
+   *
+   * @return
+   */
+  public boolean isEmpty() {
+    return delIntervalMap.isEmpty();
+  }
+
+  /**
+   * Check the deletion-state of the data-points of specific time range 
according to the info of
+   * .mods
+   *
+   * @param startTime - the start time of data set, inclusive
+   * @param endTime - the end time of data set, inclusive
+   * @return - Please refer to the definition of DeletedType
+   */
+  public DeletedType checkDeletedState(long startTime, long endTime) {
+    if (delIntervalMap.isEmpty()) {
+      return DeletedType.NO_DELETED;
+    }
+
+    if (startTime > endTime) {
+      throw new IllegalArgumentException("checkDeletedState(), error: 
startTime > endTime.");
+    }
+
+    Map.Entry<Long, Long> startEntry = delIntervalMap.floorEntry(startTime);
+    Map.Entry<Long, Long> endEntry = delIntervalMap.floorEntry(endTime);
+
+    if (!Objects.equals(startEntry, endEntry)) {
+      return DeletedType.PARTIAL_DELETED;
+    }
+
+    // == when (startEntry == endEntry == null)
+    if (startEntry == null) {
+      return DeletedType.NO_DELETED;
+    }
+
+    if (startTime > startEntry.getValue()) {
+      return DeletedType.NO_DELETED;
+    }
+
+    if (endTime <= startEntry.getValue()) {
+      return DeletedType.FULL_DELETED;
+    }
+
+    return DeletedType.PARTIAL_DELETED;
+  }
+
+  /**
+   * Check whether this timestamp's data has been deleted according to .mods 
info and data timestamp
+   *
+   * @param ts - data timestamp
+   * @return
+   */
+  public boolean isDeleted(long ts) {
+    if (delIntervalMap.isEmpty()) {
+      return false;
+    }
+
+    Map.Entry<Long, Long> entry = delIntervalMap.floorEntry(ts);
+    if (entry == null) {
+      return false;
+    }
+
+    if (ts > entry.getValue()) {
+      return false;
+    }
+
+    return true;
+  }
+
+  /**
+   * Check whether ascending timestamp batch data have been deleted according 
to .mods info. This
+   * method has better performance than method isDeleted(long ts) for 
time-ascending bath data.
+   *
+   * <p>Note1: This method is only used for processing time-ascending batch 
data.
+   *
+   * <p>Note2: Input parameter intervalCursor must be 1 variable. For first 
calling this method,
+   * need use new variable intervalCursor or call intervalCursor.reset(). Then 
continue using same
+   * variable intervalCursor for consequent calling.
+   *
+   * @param ts
+   * @param intervalCursor
+   * @return
+   */
+  public boolean isDeleted(long ts, IntervalCursor intervalCursor) {
+    if (delIntervalMap.isEmpty()) {
+      return false;
+    }
+
+    // == for first calling
+    if (intervalCursor.iter == null) {
+      Long floorKey = delIntervalMap.floorKey(ts);
+      if (floorKey == null) {
+        intervalCursor.iter = delIntervalMap.entrySet().iterator();
+        intervalCursor.startTime = delIntervalMap.firstKey();
+        intervalCursor.endTime = delIntervalMap.firstEntry().getValue();
+        return false;
+      }
+
+      intervalCursor.iter = delIntervalMap.tailMap(floorKey, 
true).entrySet().iterator();
+      Map.Entry<Long, Long> entry = intervalCursor.iter.next();
+      intervalCursor.startTime = entry.getKey();
+      intervalCursor.endTime = entry.getValue();
+    }
+
+    if (intervalCursor.subsequentNoDelete) {
+      return false;
+    }
+
+    while (true) {
+      if (ts < intervalCursor.startTime) {
+        return false;
+      }
+      if (ts <= intervalCursor.endTime) {
+        return true;
+      }
+
+      if (intervalCursor.iter.hasNext()) {
+        Map.Entry<Long, Long> entry = intervalCursor.iter.next();
+        intervalCursor.startTime = entry.getKey();
+        intervalCursor.endTime = entry.getValue();
+        continue;
+      } else {
+        intervalCursor.subsequentNoDelete = true;
+        break;
+      }
+    }
+
+    return false;
+  }
+
+  @TestOnly
+  public TreeMap<Long, Long> getDelIntervalMap() {
+    return delIntervalMap;
+  }
+}
diff --git 
a/server/src/main/java/org/apache/iotdb/db/sync/datasource/ModsfileOpBlock.java 
b/server/src/main/java/org/apache/iotdb/db/sync/datasource/ModsfileOpBlock.java
deleted file mode 100644
index f708806842..0000000000
--- 
a/server/src/main/java/org/apache/iotdb/db/sync/datasource/ModsfileOpBlock.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.iotdb.db.sync.datasource;
-
-import org.apache.iotdb.db.sync.externalpipe.operation.Operation;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class ModsfileOpBlock extends AbstractOpBlock {
-  private static final Logger logger = 
LoggerFactory.getLogger(ModsfileOpBlock.class);
-
-  public ModsfileOpBlock(String sg, String modsFileName) {
-    super(sg, -1);
-  }
-
-  @Override
-  public long getDataCount() {
-    if (dataCount >= 0) {
-      return dataCount;
-    }
-    // ToDO:
-    return 0;
-  }
-
-  @Override
-  public Operation getOperation(long index, long length) {
-    return null;
-  }
-
-  @Override
-  public void close() {
-    super.close();
-  }
-}
diff --git 
a/server/src/main/java/org/apache/iotdb/db/sync/datasource/PipeOpManager.java 
b/server/src/main/java/org/apache/iotdb/db/sync/datasource/PipeOpManager.java
index ca6e89f1ed..04d1281ebf 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/sync/datasource/PipeOpManager.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/sync/datasource/PipeOpManager.java
@@ -74,7 +74,8 @@ public class PipeOpManager {
     pipeOpSgManager.addPipeOpBlock(dataSrcEntry);
   }
 
-  public void appendTsFile(String sgName, String tsFilename, long 
pipeDataSerialNumber)
+  public void appendTsFile(
+      String sgName, String tsFilename, String modsFileFullName, long 
pipeDataSerialNumber)
       throws IOException {
     File file = new File(tsFilename);
     if (!file.exists()) {
@@ -87,7 +88,8 @@ public class PipeOpManager {
       maxFilePipeSerialNumber = pipeDataSerialNumber;
     }
 
-    TsFileOpBlock tsfileDataSrcEntry = new TsFileOpBlock(sgName, tsFilename, 
pipeDataSerialNumber);
+    TsFileOpBlock tsfileDataSrcEntry =
+        new TsFileOpBlock(sgName, tsFilename, modsFileFullName, 
pipeDataSerialNumber);
     appendDataSrc(sgName, tsfileDataSrcEntry);
   }
 
diff --git 
a/server/src/main/java/org/apache/iotdb/db/sync/datasource/TsFileOpBlock.java 
b/server/src/main/java/org/apache/iotdb/db/sync/datasource/TsFileOpBlock.java
index 1c7983e128..93d26275b3 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/sync/datasource/TsFileOpBlock.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/sync/datasource/TsFileOpBlock.java
@@ -20,9 +20,15 @@
 package org.apache.iotdb.db.sync.datasource;
 
 import org.apache.iotdb.commons.exception.IllegalPathException;
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.commons.utils.TestOnly;
+import org.apache.iotdb.db.engine.modification.Deletion;
+import org.apache.iotdb.db.engine.modification.Modification;
+import org.apache.iotdb.db.engine.modification.ModificationFile;
 import org.apache.iotdb.db.metadata.path.MeasurementPath;
 import org.apache.iotdb.db.sync.externalpipe.operation.InsertOperation;
 import org.apache.iotdb.db.sync.externalpipe.operation.Operation;
+import org.apache.iotdb.tsfile.common.cache.LRUCache;
 import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
 import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
 import org.apache.iotdb.tsfile.encoding.decoder.Decoder;
@@ -47,21 +53,25 @@ import 
org.apache.iotdb.tsfile.read.reader.page.ValuePageReader;
 import org.apache.iotdb.tsfile.utils.Pair;
 import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
 
-import org.apache.commons.lang3.tuple.ImmutableTriple;
-import org.apache.commons.lang3.tuple.Triple;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.File;
 import java.io.IOException;
 import java.nio.BufferOverflowException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
 
+import static java.lang.Math.max;
 import static java.lang.Math.min;
+import static 
org.apache.iotdb.db.sync.datasource.DeletionGroup.DeletedType.FULL_DELETED;
+import static 
org.apache.iotdb.db.sync.datasource.DeletionGroup.DeletedType.NO_DELETED;
 
 /** This class will parse 1 TsFile's content to 1 operation block. */
 public class TsFileOpBlock extends AbstractOpBlock {
@@ -69,36 +79,82 @@ public class TsFileOpBlock extends AbstractOpBlock {
 
   // tsFile name
   private String tsFileName;
+  private String modsFileName;
   private TsFileFullReader tsFileFullSeqReader;
 
   // full Timeseries Metadata TreeMap : FileOffset => Pair(DeviceId, 
TimeseriesMetadata)
   private Map<Long, Pair<Path, TimeseriesMetadata>> fullTsMetadataMap;
-  // TreeMap: LocalIndex => Pair(SensorFullPath, ChunkOffset, PointCount)
-  private TreeMap<Long, Triple<String, Long, Long>> indexToChunkInfoMap;
+  // TreeMap: LocalIndex => ChunkInfo (measurementFullPath, ChunkOffset, 
PointCount, deletedFlag)
+  private TreeMap<Long, ChunkInfo> indexToChunkInfoMap;
+
+  // Save all modifications that are from .mods file.
+  // (modificationList == null) means no .mods file or .mods file is empty.
+  Collection<Modification> modificationList;
+  // HashMap: measurement FullPath => DeletionGroup(save deletion info)
+  private Map<String, DeletionGroup> fullPathToDeletionMap;
+
+  // LRUMap: PageOffsetInTsfile => PageData
+  private LRUCache<Long, List<TimeValuePair>> pageCache;
+
+  private boolean dataReady = false;
 
   Decoder timeDecoder =
       Decoder.getDecoderByType(
           
TSEncoding.valueOf(TSFileDescriptor.getInstance().getConfig().getTimeEncoder()),
           TSDataType.INT64);
 
+  private class ChunkInfo {
+    public String measurementFullPath;
+    public long chunkOffset;
+    public long pointCount;
+    public DeletionGroup.DeletedType deletedFlag = NO_DELETED;
+  }
+
   public TsFileOpBlock(String sg, String tsFileName, long 
pipeDataSerialNumber) throws IOException {
     this(sg, tsFileName, pipeDataSerialNumber, 0);
   }
 
   public TsFileOpBlock(String sg, String tsFileName, long 
pipeDataSerialNumber, long beginIndex)
       throws IOException {
+    this(sg, tsFileName, null, pipeDataSerialNumber, beginIndex);
+  }
+
+  public TsFileOpBlock(String sg, String tsFileName, String modsFileName, long 
pipeDataSerialNumber)
+      throws IOException {
+    this(sg, tsFileName, modsFileName, pipeDataSerialNumber, 0);
+  }
+
+  public TsFileOpBlock(
+      String sg, String tsFileName, String modsFileName, long 
pipeDataSerialNumber, long beginIndex)
+      throws IOException {
     super(sg, beginIndex);
     this.filePipeSerialNumber = pipeDataSerialNumber;
     this.tsFileName = tsFileName;
-    init();
+
+    this.modsFileName = null;
+    if (modsFileName != null) {
+      if (new File(modsFileName).exists()) {
+        this.modsFileName = modsFileName;
+      }
+    }
+
+    pageCache =
+        new LRUCache<Long, List<TimeValuePair>>(3) {
+          @Override
+          public List<TimeValuePair> loadObjectByKey(Long key) throws 
IOException {
+            return null;
+          }
+        };
+
+    calculateDataCount();
   }
 
   /**
-   * Init TsfileDataSrcEntry's dataCount
+   * Calculate TsfileOpBlock's dataCount
    *
    * @throws IOException
    */
-  private void init() throws IOException {
+  private void calculateDataCount() throws IOException {
     // == calculate dataCount according to tsfile
     tsFileFullSeqReader = new TsFileFullReader(this.tsFileName);
     fullTsMetadataMap = tsFileFullSeqReader.getAllTimeseriesMeta(false);
@@ -110,6 +166,8 @@ public class TsFileOpBlock extends AbstractOpBlock {
     for (Pair<Path, TimeseriesMetadata> entry : fullTsMetadataMap.values()) {
       dataCount += entry.right.getStatistics().getCount();
     }
+
+    // == Here, release fullTsMetadataMap for saving memory.
     fullTsMetadataMap = null;
   }
 
@@ -123,6 +181,26 @@ public class TsFileOpBlock extends AbstractOpBlock {
     return dataCount;
   }
 
+  /**
+   * Check the deletion-state of the data-points of specific time range 
according to the info of
+   * .mods.
+   *
+   * @param measurementPath - measurementPath full path without wildcard
+   * @param startTime - the start time of data set, inclusive
+   * @param endTime - the end time of data set, inclusive
+   * @return
+   */
+  private DeletionGroup.DeletedType checkDeletedState(
+      String measurementPath, long startTime, long endTime) {
+    DeletionGroup deletionGroup = getFullPathDeletion(measurementPath);
+
+    if (deletionGroup == null) {
+      return NO_DELETED;
+    }
+
+    return deletionGroup.checkDeletedState(startTime, endTime);
+  }
+
   /**
    * Generate indexToChunkInfoMap for whole TsFile
    *
@@ -137,8 +215,8 @@ public class TsFileOpBlock extends AbstractOpBlock {
       fullTsMetadataMap = tsFileFullSeqReader.getAllTimeseriesMeta(true);
     }
 
-    // chunkOffset => pair(SensorFullPath, chunkPointCount)
-    Map<Long, Pair<String, Long>> offsetToCountMap = new TreeMap<>();
+    // chunkOffset => ChunkInfo (measurementFullPath, ChunkOffset, PointCount, 
deletedFlag)
+    Map<Long, ChunkInfo> offsetToCountMap = new TreeMap<>();
     for (Pair<Path, TimeseriesMetadata> value : fullTsMetadataMap.values()) {
       List<IChunkMetadata> chunkMetaList = value.right.getChunkMetadataList();
 
@@ -148,24 +226,95 @@ public class TsFileOpBlock extends AbstractOpBlock {
 
       for (IChunkMetadata chunkMetadata : chunkMetaList) {
         // traverse every chunk
-        long chunkOffset = chunkMetadata.getOffsetOfChunkHeader();
-        long chunkPointCount = chunkMetadata.getStatistics().getCount();
-        String sensorFullPath = value.left.getFullPath();
-        ;
-        offsetToCountMap.put(chunkOffset, new Pair<>(sensorFullPath, 
chunkPointCount));
+        ChunkInfo chunkInfo = new ChunkInfo();
+        chunkInfo.measurementFullPath = value.left.getFullPath();
+        chunkInfo.chunkOffset = chunkMetadata.getOffsetOfChunkHeader();
+        chunkInfo.pointCount = chunkMetadata.getStatistics().getCount();
+
+        chunkInfo.deletedFlag =
+            checkDeletedState(
+                chunkInfo.measurementFullPath,
+                chunkMetadata.getStatistics().getStartTime(),
+                chunkMetadata.getStatistics().getEndTime());
+
+        offsetToCountMap.put(chunkInfo.chunkOffset, chunkInfo);
       }
     }
 
     indexToChunkInfoMap = new TreeMap<>();
     long localIndex = 0;
-    for (Map.Entry<Long, Pair<String, Long>> entry : 
offsetToCountMap.entrySet()) {
-      Long chunkHeaderOffset = entry.getKey();
-      Long pointCount = entry.getValue().right;
-      String sensorFullPath = entry.getValue().left;
-      indexToChunkInfoMap.put(
-          localIndex, new ImmutableTriple<>(sensorFullPath, chunkHeaderOffset, 
pointCount));
-      localIndex += pointCount;
+    for (ChunkInfo chunkInfo : offsetToCountMap.values()) {
+      indexToChunkInfoMap.put(localIndex, chunkInfo);
+      localIndex += chunkInfo.pointCount;
+    }
+  }
+
+  /**
+   * Generate modificationList using .mods file. Result: (modificationList == 
null) means no .mods
+   * file or .mods file is empty.
+   *
+   * @throws IOException
+   */
+  private void buildModificationList() throws IOException {
+    if (modsFileName == null) {
+      logger.debug("buildModificationList(), modsFileName is null.");
+      modificationList = null;
+      return;
     }
+
+    try (ModificationFile modificationFile = new 
ModificationFile(modsFileName)) {
+      modificationList = modificationFile.getModifications();
+    }
+
+    if (modificationList.isEmpty()) {
+      modificationList = null;
+    }
+  }
+
+  /**
+   * Generate fullPathToDeletionMap for fullPath
+   *
+   * @param fullPath measurement full path without wildcard
+   * @return
+   */
+  private DeletionGroup getFullPathDeletion(String fullPath) {
+    // (fullPathToDeletionMap == null) means modificationList is null or empty
+    if (fullPathToDeletionMap == null) {
+      return null;
+    }
+
+    // Try to get data from cache firstly
+    if (fullPathToDeletionMap.containsKey(fullPath)) {
+      return fullPathToDeletionMap.get(fullPath);
+    }
+
+    // == insert all deletion intervals info to deletionGroup.
+    DeletionGroup deletionGroup = new DeletionGroup();
+    PartialPath partialPath = null;
+    try {
+      partialPath = new PartialPath(fullPath);
+    } catch (IllegalPathException e) {
+      logger.error("getFullPathDeletion(), find invalid fullPath: {}", 
fullPath);
+    }
+
+    if (partialPath != null) {
+      // == Here, has been sure  (modificationList != null) && 
(!modificationList.isEmpty())
+      for (Modification modification : modificationList) {
+        if ((modification instanceof Deletion)
+            && (modification.getPath().matchFullPath(partialPath))) {
+          Deletion deletion = (Deletion) modification;
+          deletionGroup.addDelInterval(deletion.getStartTime(), 
deletion.getEndTime());
+        }
+      }
+    }
+
+    if (deletionGroup.isEmpty()) {
+      deletionGroup = null;
+    }
+
+    fullPathToDeletionMap.put(fullPath, deletionGroup);
+
+    return deletionGroup;
   }
 
   /**
@@ -173,72 +322,142 @@ public class TsFileOpBlock extends AbstractOpBlock {
    * tvPairList for better performance
    *
    * @param chunkHeader
-   * @param indexInChunk
+   * @param startIndexInChunk
    * @param length
+   * @param deletionGroup - if it is not null, need to check whether data 
points have benn deleted
    * @param tvPairList
    * @return
    * @throws IOException
    */
   private long getNonAlignedChunkPoints(
-      ChunkHeader chunkHeader, long indexInChunk, long length, 
List<TimeValuePair> tvPairList)
+      ChunkHeader chunkHeader,
+      long startIndexInChunk,
+      long length,
+      DeletionGroup deletionGroup,
+      List<TimeValuePair> tvPairList)
       throws IOException {
     Decoder valueDecoder =
         Decoder.getDecoderByType(chunkHeader.getEncodingType(), 
chunkHeader.getDataType());
-    int chunkDataSize = chunkHeader.getDataSize();
+    int chunkLeftDataSize = chunkHeader.getDataSize();
 
-    int index = 0;
-    while (chunkDataSize > 0) {
+    long index = 0;
+    while ((chunkLeftDataSize > 0) && ((index - startIndexInChunk) < length)) {
       // begin to traverse every page
-      long filePos = tsFileFullSeqReader.position();
-      boolean hasStatistic = ((chunkHeader.getChunkType() & 0x3F) == 
MetaMarker.CHUNK_HEADER);
+      long pagePosInTsfile = tsFileFullSeqReader.position();
+      boolean pageHeaderHasStatistic =
+          ((chunkHeader.getChunkType() & 0x3F) == MetaMarker.CHUNK_HEADER);
       PageHeader pageHeader =
-          tsFileFullSeqReader.readPageHeader(chunkHeader.getDataType(), 
hasStatistic);
+          tsFileFullSeqReader.readPageHeader(chunkHeader.getDataType(), 
pageHeaderHasStatistic);
 
       int pageSize = pageHeader.getSerializedPageSize();
-      chunkDataSize -= pageSize;
+      chunkLeftDataSize -= pageSize;
 
-      if (hasStatistic) {
+      if (pageHeaderHasStatistic) {
+        // == check whether whole page is out of  [startIndexInChunk, 
startIndexInChunk + length)
         long pageDataCount = pageHeader.getNumOfValues();
-        if ((index + pageDataCount) < indexInChunk) { // skip this page
-          tsFileFullSeqReader.position(filePos + pageSize);
+        if ((index + pageDataCount) <= startIndexInChunk) { // skip this page
+          tsFileFullSeqReader.position(pagePosInTsfile + pageSize);
           index += pageDataCount;
           continue;
         }
-      }
 
-      ByteBuffer pageData =
-          tsFileFullSeqReader.readPage(pageHeader, 
chunkHeader.getCompressionType());
+        // == check whether whole page has been deleted by .mods file
+        if (deletionGroup == null) { // No deletion related to current chunk
+          continue;
+        }
+
+        DeletionGroup.DeletedType deletedType =
+            deletionGroup.checkDeletedState(pageHeader.getStartTime(), 
pageHeader.getEndTime());
 
-      valueDecoder.reset();
-      PageReader pageReader =
-          new PageReader(pageData, chunkHeader.getDataType(), valueDecoder, 
timeDecoder, null);
-      BatchData batchData = pageReader.getAllSatisfiedPageData();
-      if (chunkHeader.getChunkType() == MetaMarker.CHUNK_HEADER) {
-        logger.debug("points in the page(by pageHeader): " + 
pageHeader.getNumOfValues());
-      } else {
-        logger.debug("points in the page(by batchData): " + 
batchData.length());
+        if (deletedType == FULL_DELETED) {
+          long needCount =
+              min(index + pageDataCount, startIndexInChunk + length)
+                  - max(index, startIndexInChunk);
+          for (long i = 0; i < needCount; i++) {
+            tvPairList.add(null);
+          }
+          tsFileFullSeqReader.position(pagePosInTsfile + pageSize);
+          index += pageDataCount;
+          continue;
+        }
       }
 
-      if (batchData.isEmpty()) {
-        logger.warn("getNonAlignedChunkPoints(), chunk is empty, chunkHeader = 
{}.", chunkHeader);
+      // == At first, try to get page-data from pageCache.
+      List<TimeValuePair> pageTVList = pageCache.get(pagePosInTsfile);
+      // == if pageCache has no data
+      if (pageTVList == null) {
+        // == read the page's all data
+        pageTVList = getNonAlignedPagePoints(pageHeader, chunkHeader, 
valueDecoder, deletionGroup);
+        pageCache.put(pagePosInTsfile, pageTVList);
       }
 
-      batchData.resetBatchData();
-      while (batchData.hasCurrent()) {
-        if (index++ >= indexInChunk) {
-          TimeValuePair timeValuePair =
-              new TimeValuePair(batchData.currentTime(), 
batchData.currentTsPrimitiveType());
-          logger.debug("getNonAlignedChunkPoints(), timeValuePair = {} ", 
timeValuePair);
-          tvPairList.add(timeValuePair);
-        }
-        if ((index - indexInChunk) >= length) { // data point is enough
-          return (index - indexInChunk);
-        }
-        batchData.next();
+      int beginIdxInPage = (int) (max(index, startIndexInChunk) - index);
+      int countInPage = (int) min(pageTVList.size(), length - index + 
startIndexInChunk);
+      tvPairList.addAll(
+          ((LinkedList) pageTVList).subList(beginIdxInPage, beginIdxInPage + 
countInPage));
+
+      index += countInPage;
+    }
+
+    return (index - startIndexInChunk);
+  }
+
+  /**
+   * Parse 1 NonAligned page to get all data points. Note:
+   *
+   * <p>1) New data will be appended to parameter tvPairList for better 
performance
+   *
+   * <p>2) deleted data by .mods will be set to null.
+   *
+   * @param pageHeader
+   * @param chunkHeader
+   * @param valueDecoder
+   * @param deletionGroup
+   * @return
+   * @throws IOException
+   */
+  private List<TimeValuePair> getNonAlignedPagePoints(
+      PageHeader pageHeader,
+      ChunkHeader chunkHeader,
+      Decoder valueDecoder,
+      DeletionGroup deletionGroup)
+      throws IOException {
+    List<TimeValuePair> tvList = new LinkedList<>();
+
+    ByteBuffer pageData =
+        tsFileFullSeqReader.readPage(pageHeader, 
chunkHeader.getCompressionType());
+
+    valueDecoder.reset();
+    PageReader pageReader =
+        new PageReader(pageData, chunkHeader.getDataType(), valueDecoder, 
timeDecoder, null);
+    BatchData batchData = pageReader.getAllSatisfiedPageData();
+    if (chunkHeader.getChunkType() == MetaMarker.CHUNK_HEADER) {
+      logger.debug("points in the page(by pageHeader): " + 
pageHeader.getNumOfValues());
+    } else {
+      logger.debug("points in the page(by batchData): " + batchData.length());
+    }
+
+    if (batchData.isEmpty()) {
+      logger.warn("getNonAlignedChunkPoints(), chunk is empty, chunkHeader = 
{}.", chunkHeader);
+      return tvList;
+    }
+
+    batchData.resetBatchData();
+    DeletionGroup.IntervalCursor intervalCursor = new 
DeletionGroup.IntervalCursor();
+    while (batchData.hasCurrent()) {
+      long ts = batchData.currentTime();
+      if ((deletionGroup != null) && (deletionGroup.isDeleted(ts, 
intervalCursor))) {
+        tvList.add(null);
+      } else {
+        TimeValuePair timeValuePair = new TimeValuePair(ts, 
batchData.currentTsPrimitiveType());
+        logger.debug("getNonAlignedChunkPoints(), timeValuePair = {} ", 
timeValuePair);
+        tvList.add(timeValuePair);
       }
+
+      batchData.next();
     }
 
-    return (index - indexInChunk);
+    return tvList;
   }
 
   /**
@@ -248,6 +467,7 @@ public class TsFileOpBlock extends AbstractOpBlock {
    * @param chunkHeader
    * @param indexInChunk
    * @param lengthInChunk
+   * @param deletionGroup - if it is not null, need to check whether data 
points have benn deleted
    * @param tvPairList
    * @return
    * @throws IOException
@@ -256,6 +476,7 @@ public class TsFileOpBlock extends AbstractOpBlock {
       ChunkHeader chunkHeader,
       long indexInChunk,
       long lengthInChunk,
+      DeletionGroup deletionGroup,
       List<TimeValuePair> tvPairList)
       throws IOException {
     List<long[]> timeBatch = new ArrayList<>();
@@ -324,27 +545,43 @@ public class TsFileOpBlock extends AbstractOpBlock {
   }
 
   /**
-   * get 1 Chunk's partial data points according to indexInChunk & length 
Note: new data will be
-   * appended to parameter tvPairList for better performance
+   * get 1 Chunk's partial data points according to indexInChunk & 
lengthInChunk Note: new data will
+   * be appended to parameter tvPairList for better performance
    *
-   * @param chunkHeaderOffset
+   * @param chunkInfo
    * @param indexInChunk
-   * @param length
+   * @param lengthInChunk
    * @param tvPairList, the got data points will be appended to this List.
    * @return
    * @throws IOException
    */
   private long getChunkPoints(
-      long chunkHeaderOffset, long indexInChunk, long length, 
List<TimeValuePair> tvPairList)
+      ChunkInfo chunkInfo, long indexInChunk, long lengthInChunk, 
List<TimeValuePair> tvPairList)
       throws IOException {
-    tsFileFullSeqReader.position(chunkHeaderOffset);
+
+    // == If whole chunk has been deleted according to .mods file
+    if (chunkInfo.deletedFlag == FULL_DELETED) {
+      for (long i = 0; i < lengthInChunk; i++) {
+        tvPairList.add(null);
+      }
+      return lengthInChunk;
+    }
+
+    tsFileFullSeqReader.position(chunkInfo.chunkOffset);
     byte chunkTypeByte = tsFileFullSeqReader.readMarker();
     ChunkHeader chunkHeader = 
tsFileFullSeqReader.readChunkHeader(chunkTypeByte);
 
+    DeletionGroup deletionGroup = null;
+    if (chunkInfo.deletedFlag != NO_DELETED) {
+      deletionGroup = getFullPathDeletion(chunkInfo.measurementFullPath);
+    }
+
     if (chunkHeader.getDataType() == TSDataType.VECTOR) {
-      return getAlignedChunkPoints(chunkHeader, indexInChunk, length, 
tvPairList);
+      return getAlignedChunkPoints(
+          chunkHeader, indexInChunk, lengthInChunk, deletionGroup, tvPairList);
     } else {
-      return getNonAlignedChunkPoints(chunkHeader, indexInChunk, length, 
tvPairList);
+      return getNonAlignedChunkPoints(
+          chunkHeader, indexInChunk, lengthInChunk, deletionGroup, tvPairList);
     }
   }
 
@@ -365,12 +602,32 @@ public class TsFileOpBlock extends AbstractOpBlock {
     try {
       measurementPath = new MeasurementPath(sensorFullPath);
     } catch (IllegalPathException e) {
-      logger.error("TsfileDataSrcEntry.insertToDataList(), Illegal 
MeasurementPath: {}", "");
+      logger.error("TsFileOpBlock.insertToDataList(), Illegal MeasurementPath: 
{}", "");
       throw new IOException("Illegal MeasurementPath: " + sensorFullPath, e);
     }
     dataList.add(new Pair<>(measurementPath, tvPairList));
   }
 
+  private void prepareData() throws IOException {
+    if (tsFileFullSeqReader == null) {
+      tsFileFullSeqReader = new TsFileFullReader(tsFileName);
+    }
+
+    if (modsFileName != null) {
+      buildModificationList();
+    }
+
+    if ((fullPathToDeletionMap == null) && (modificationList != null)) {
+      fullPathToDeletionMap = new HashMap<>();
+    }
+
+    if (indexToChunkInfoMap == null) {
+      buildIndexToChunkMap();
+    }
+
+    dataReady = true;
+  }
+
   /**
    * Get 1 Operation that contain needed data. Note: 1) Expected data range is 
[index, index+length)
    * 2) Real returned data length can less than input parameter length
@@ -382,19 +639,20 @@ public class TsFileOpBlock extends AbstractOpBlock {
    */
   @Override
   public Operation getOperation(long index, long length) throws IOException {
-    long indexInTsfile = index - beginIndex;
-
-    if (indexInTsfile < 0 || indexInTsfile >= dataCount) {
-      logger.error("TsfileDataSrcEntry.getOperation(), index {} is out of 
range.", index);
-      throw new IOException("index is out of range.");
+    if (closed) {
+      logger.error("TsFileOpBlock.getOperation(), can not access closed 
TsFileOpBlock: {}.", this);
+      throw new IOException("can not access closed TsFileOpBlock: " + this);
     }
 
-    if (tsFileFullSeqReader == null) {
-      tsFileFullSeqReader = new TsFileFullReader(tsFileName);
+    long indexInTsfile = index - beginIndex;
+    if (indexInTsfile < 0 || indexInTsfile >= dataCount) {
+      logger.error("TsFileOpBlock.getOperation(), Error: index {} is out of 
range.", index);
+      // throw new IOException("index is out of range.");
+      return null;
     }
 
-    if (indexToChunkInfoMap == null) {
-      buildIndexToChunkMap();
+    if (!dataReady) {
+      prepareData();
     }
 
     LinkedList<Pair<MeasurementPath, List<TimeValuePair>>> dataList = new 
LinkedList<>();
@@ -404,19 +662,18 @@ public class TsFileOpBlock extends AbstractOpBlock {
     // handle all chunks that contain needed data
     long remain = length;
     while (remain > 0) {
-      Map.Entry<Long, Triple<String, Long, Long>> entry =
-          indexToChunkInfoMap.floorEntry(indexInTsfile);
+      Map.Entry<Long, ChunkInfo> entry = 
indexToChunkInfoMap.floorEntry(indexInTsfile);
       if (entry == null) {
         logger.error(
-            "TsfileDataSrcEntry.getOperation(), indexInTsfile {} if out of 
indexToChunkOffsetMap.",
+            "TsFileOpBlock.getOperation(), indexInTsfile {} if out of 
indexToChunkOffsetMap.",
             indexInTsfile);
         throw new IOException("indexInTsfile is out of range.");
       }
 
-      Long indexInChunk = indexInTsfile - entry.getKey();
-      String sensorFullPath = entry.getValue().getLeft();
-      Long chunkHeaderOffset = entry.getValue().getMiddle();
-      Long chunkPointCount = entry.getValue().getRight();
+      long indexInChunk = indexInTsfile - entry.getKey();
+      ChunkInfo chunkInfo = entry.getValue();
+      String sensorFullPath = chunkInfo.measurementFullPath;
+      long chunkPointCount = chunkInfo.pointCount;
 
       long lengthInChunk = min(chunkPointCount - indexInChunk, remain);
 
@@ -432,17 +689,17 @@ public class TsFileOpBlock extends AbstractOpBlock {
       if (tvPairList == null) {
         tvPairList = new LinkedList<>();
       }
-      long daltaCount = getChunkPoints(chunkHeaderOffset, indexInChunk, 
lengthInChunk, tvPairList);
-      if (daltaCount != lengthInChunk) {
+      long readCount = getChunkPoints(chunkInfo, indexInChunk, lengthInChunk, 
tvPairList);
+      if (readCount != lengthInChunk) {
         logger.error(
-            "TsfileDataSrcEntry.getOperation(), error when read chunk from 
file {}. lengthInChunk={}, daltaCount={}, ",
+            "TsFileOpBlock.getOperation(), error when read chunk from file {}. 
lengthInChunk={}, readCount={}, ",
             indexInTsfile,
             lengthInChunk,
-            daltaCount);
+            readCount);
         throw new IOException("Error read chunk from file:" + indexInTsfile);
       }
 
-      remain -= daltaCount;
+      remain -= readCount;
       indexInTsfile = entry.getKey() + chunkPointCount; // next chunk's local 
index
 
       if (indexInTsfile >= dataCount) { // has reached the end of this Tsfile
@@ -469,6 +726,8 @@ public class TsFileOpBlock extends AbstractOpBlock {
       }
       tsFileFullSeqReader = null;
     }
+
+    dataReady = false;
   }
 
   /** This class is used to read & parse Tsfile */
@@ -601,4 +860,30 @@ public class TsFileOpBlock extends AbstractOpBlock {
       return tsFileMetaData;
     }
   }
+
+  @TestOnly
+  public Collection<Modification> getModificationList() {
+    return modificationList;
+  }
+
+  @TestOnly
+  public Map<String, DeletionGroup> getFullPathToDeletionMap() {
+    return fullPathToDeletionMap;
+  }
+
+  @Override
+  public String toString() {
+    return "storageGroup="
+        + storageGroup
+        + ", tsFileName="
+        + tsFileName
+        + ", modsFileName="
+        + modsFileName
+        + ", filePipeSerialNumber="
+        + filePipeSerialNumber
+        + ", beginIndex="
+        + beginIndex
+        + ", dataCount="
+        + dataCount;
+  }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/sync/externalpipe/ExtPipePlugin.java 
b/server/src/main/java/org/apache/iotdb/db/sync/externalpipe/ExtPipePlugin.java
index 7ef6461191..2bec049a75 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/sync/externalpipe/ExtPipePlugin.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/sync/externalpipe/ExtPipePlugin.java
@@ -19,6 +19,8 @@
 
 package org.apache.iotdb.db.sync.externalpipe;
 
+import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.commons.concurrent.ThreadName;
 import org.apache.iotdb.commons.utils.TestOnly;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.metadata.path.MeasurementPath;
@@ -49,7 +51,6 @@ import java.util.Set;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Collectors;
@@ -181,13 +182,8 @@ public class ExtPipePlugin {
 
     // == Launch pipe worker threads
     executorService =
-        Executors.newFixedThreadPool(
-            threadNum,
-            r -> {
-              Thread thread = new Thread(r);
-              thread.setName("ExtPipePlugin-worker-" + extPipeTypeName + "-" + 
thread.getId());
-              return thread;
-            });
+        IoTDBThreadPoolFactory.newFixedThreadPool(
+            threadNum, ThreadName.EXT_PIPE_PLUGIN_WORKER.getName() + "-" + 
extPipeTypeName);
 
     // == Start threads that will run external PiPeSink plugin
     dataTransmissionTasks = new ArrayList<>(threadNum);
@@ -529,6 +525,9 @@ public class ExtPipePlugin {
       for (Pair<MeasurementPath, List<TimeValuePair>> dataPair : 
operation.getDataList()) {
         MeasurementPath path = dataPair.left;
         for (TimeValuePair tvPair : dataPair.right) {
+          if (tvPair == null) {
+            continue;
+          }
           String[] nodes = path.getNodes();
           long timestampInMs = tvPair.getTimestamp() / timestampDivisor;
           switch (tvPair.getValue().getDataType()) {
diff --git 
a/server/src/main/java/org/apache/iotdb/db/sync/externalpipe/ExtPipePluginManager.java
 
b/server/src/main/java/org/apache/iotdb/db/sync/externalpipe/ExtPipePluginManager.java
index b8ed356e86..5a1b1abe5a 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/sync/externalpipe/ExtPipePluginManager.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/sync/externalpipe/ExtPipePluginManager.java
@@ -195,8 +195,10 @@ public class ExtPipePluginManager {
 
             String sgName = tsFilePipeData.getStorageGroupName();
             String tsFileFullName = tsFilePipeData.getTsFilePath();
+            String modsFileFullName = tsFilePipeData.getModsFilePath();
             try {
-              pipeOpManager.appendTsFile(sgName, tsFileFullName, 
pipeDataSerialNumber);
+              pipeOpManager.appendTsFile(
+                  sgName, tsFileFullName, modsFileFullName, 
pipeDataSerialNumber);
             } catch (IOException e) {
               logger.error("monitorPipeData(), Can not append TsFile: {}" + 
tsFileFullName);
             }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/sync/pipedata/TsFilePipeData.java 
b/server/src/main/java/org/apache/iotdb/db/sync/pipedata/TsFilePipeData.java
index 7fbdcadd23..42716766c0 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/pipedata/TsFilePipeData.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/pipedata/TsFilePipeData.java
@@ -104,6 +104,14 @@ public class TsFilePipeData extends PipeData {
     return parentDirPath + File.separator + tsFileName;
   }
 
+  public String getResourceFilePath() {
+    return getTsFilePath() + TsFileResource.RESOURCE_SUFFIX;
+  }
+
+  public String getModsFilePath() {
+    return getTsFilePath() + ModificationFile.FILE_SUFFIX;
+  }
+
   public String getStorageGroupName() {
     return storageGroupName;
   }
@@ -138,8 +146,8 @@ public class TsFilePipeData extends PipeData {
 
   public List<File> getTsFiles(boolean shouldWaitForTsFileClose) throws 
FileNotFoundException {
     File tsFile = new File(getTsFilePath()).getAbsoluteFile();
-    File resource = new File(tsFile.getAbsolutePath() + 
TsFileResource.RESOURCE_SUFFIX);
-    File mods = new File(tsFile.getAbsolutePath() + 
ModificationFile.FILE_SUFFIX);
+    File resource = new File(getResourceFilePath());
+    File mods = new File(getModsFilePath());
 
     List<File> files = new ArrayList<>();
     if (!tsFile.exists()) {
diff --git 
a/server/src/test/java/org/apache/iotdb/db/sync/datasource/DeletionGroupTest.java
 
b/server/src/test/java/org/apache/iotdb/db/sync/datasource/DeletionGroupTest.java
new file mode 100644
index 0000000000..df10ba50fb
--- /dev/null
+++ 
b/server/src/test/java/org/apache/iotdb/db/sync/datasource/DeletionGroupTest.java
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.iotdb.db.sync.datasource;
+
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.Iterator;
+import java.util.Map;
+import java.util.TreeMap;
+
+import static 
org.apache.iotdb.db.sync.datasource.DeletionGroup.DeletedType.FULL_DELETED;
+import static 
org.apache.iotdb.db.sync.datasource.DeletionGroup.DeletedType.NO_DELETED;
+import static 
org.apache.iotdb.db.sync.datasource.DeletionGroup.DeletedType.PARTIAL_DELETED;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class DeletionGroupTest {
+
+  private static DeletionGroup deletionGroup1;
+  private static DeletionGroup deletionGroup2; // empty
+
+  @BeforeClass
+  public static void prepareData() {
+    deletionGroup1 = new DeletionGroup();
+
+    // for item 0
+    deletionGroup1.addDelInterval(10, 30);
+    deletionGroup1.addDelInterval(20, 40);
+
+    // for item 3
+    deletionGroup1.addDelInterval(150, 200);
+    deletionGroup1.addDelInterval(150, 200);
+
+    // for item 1
+    deletionGroup1.addDelInterval(50, 50);
+    deletionGroup1.addDelInterval(50, 50);
+
+    // for item 4
+    deletionGroup1.addDelInterval(220, 300);
+    deletionGroup1.addDelInterval(250, 290);
+
+    // for item 2
+    deletionGroup1.addDelInterval(70, 110);
+    deletionGroup1.addDelInterval(70, 80);
+    deletionGroup1.addDelInterval(80, 90);
+    deletionGroup1.addDelInterval(100, 120);
+
+    deletionGroup2 = new DeletionGroup();
+  }
+
+  @Test
+  public void testAddDelInterval() {
+    boolean hasException = false;
+    try {
+      deletionGroup1.addDelInterval(10, 5);
+    } catch (IllegalArgumentException e) {
+      hasException = true;
+    }
+    assertTrue(hasException);
+
+    TreeMap<Long, Long> delIntervalMap = deletionGroup1.getDelIntervalMap();
+    Iterator<Map.Entry<Long, Long>> iter1 = 
delIntervalMap.entrySet().iterator();
+    Map.Entry<Long, Long> entry1 = iter1.next();
+    assertEquals(10, entry1.getKey().longValue());
+    assertEquals(40, entry1.getValue().longValue());
+    entry1 = iter1.next();
+    assertEquals(50, entry1.getKey().longValue());
+    assertEquals(50, entry1.getValue().longValue());
+    entry1 = iter1.next();
+    assertEquals(70, entry1.getKey().longValue());
+    assertEquals(120, entry1.getValue().longValue());
+    entry1 = iter1.next();
+    assertEquals(150, entry1.getKey().longValue());
+    assertEquals(200, entry1.getValue().longValue());
+    entry1 = iter1.next();
+    assertEquals(220, entry1.getKey().longValue());
+    assertEquals(300, entry1.getValue().longValue());
+  }
+
+  @Test
+  public void testCheckDeletedState() {
+    boolean hasException = false;
+    try {
+      deletionGroup1.checkDeletedState(5, 1);
+    } catch (IllegalArgumentException e) {
+      hasException = true;
+    }
+    assertTrue(hasException);
+
+    assertEquals(NO_DELETED, deletionGroup1.checkDeletedState(1, 5));
+    assertEquals(PARTIAL_DELETED, deletionGroup1.checkDeletedState(1, 10));
+    assertEquals(PARTIAL_DELETED, deletionGroup1.checkDeletedState(2, 15));
+    assertEquals(FULL_DELETED, deletionGroup1.checkDeletedState(10, 10));
+    assertEquals(FULL_DELETED, deletionGroup1.checkDeletedState(10, 20));
+    assertEquals(FULL_DELETED, deletionGroup1.checkDeletedState(30, 40));
+    assertEquals(FULL_DELETED, deletionGroup1.checkDeletedState(10, 40));
+    assertEquals(FULL_DELETED, deletionGroup1.checkDeletedState(40, 40));
+    assertEquals(PARTIAL_DELETED, deletionGroup1.checkDeletedState(35, 45));
+    assertEquals(PARTIAL_DELETED, deletionGroup1.checkDeletedState(40, 45));
+
+    assertEquals(FULL_DELETED, deletionGroup1.checkDeletedState(50, 50));
+    assertEquals(PARTIAL_DELETED, deletionGroup1.checkDeletedState(45, 50));
+    assertEquals(PARTIAL_DELETED, deletionGroup1.checkDeletedState(50, 55));
+    assertEquals(PARTIAL_DELETED, deletionGroup1.checkDeletedState(45, 55));
+
+    assertEquals(PARTIAL_DELETED, deletionGroup1.checkDeletedState(5, 55));
+    assertEquals(PARTIAL_DELETED, deletionGroup1.checkDeletedState(5, 500));
+
+    assertEquals(PARTIAL_DELETED, deletionGroup1.checkDeletedState(120, 140));
+    assertEquals(PARTIAL_DELETED, deletionGroup1.checkDeletedState(120, 150));
+
+    assertEquals(NO_DELETED, deletionGroup1.checkDeletedState(201, 201));
+    assertEquals(NO_DELETED, deletionGroup1.checkDeletedState(400, 500));
+
+    assertEquals(NO_DELETED, deletionGroup1.checkDeletedState(201, 219));
+    assertEquals(PARTIAL_DELETED, deletionGroup1.checkDeletedState(201, 220));
+    assertEquals(FULL_DELETED, deletionGroup1.checkDeletedState(220, 220));
+    assertEquals(FULL_DELETED, deletionGroup1.checkDeletedState(220, 230));
+    assertEquals(FULL_DELETED, deletionGroup1.checkDeletedState(230, 250));
+    assertEquals(FULL_DELETED, deletionGroup1.checkDeletedState(250, 300));
+    assertEquals(FULL_DELETED, deletionGroup1.checkDeletedState(220, 300));
+    assertEquals(PARTIAL_DELETED, deletionGroup1.checkDeletedState(220, 330));
+    assertEquals(PARTIAL_DELETED, deletionGroup1.checkDeletedState(240, 350));
+
+    // == test empty deletionGroup2
+    assertEquals(NO_DELETED, deletionGroup2.checkDeletedState(1, 500));
+  }
+
+  @Test
+  public void testIsDeleted() {
+
+    assertEquals(false, deletionGroup1.isDeleted(5));
+    assertEquals(true, deletionGroup1.isDeleted(10));
+    assertEquals(true, deletionGroup1.isDeleted(20));
+    assertEquals(true, deletionGroup1.isDeleted(40));
+    assertEquals(false, deletionGroup1.isDeleted(45));
+
+    assertEquals(true, deletionGroup1.isDeleted(50));
+
+    assertEquals(false, deletionGroup1.isDeleted(60));
+    assertEquals(true, deletionGroup1.isDeleted(70));
+    assertEquals(true, deletionGroup1.isDeleted(100));
+    assertEquals(true, deletionGroup1.isDeleted(120));
+    assertEquals(false, deletionGroup1.isDeleted(122));
+
+    assertEquals(true, deletionGroup1.isDeleted(220));
+    assertEquals(true, deletionGroup1.isDeleted(250));
+    assertEquals(true, deletionGroup1.isDeleted(300));
+    assertEquals(false, deletionGroup1.isDeleted(400));
+
+    // == test empty deletionGroup2
+    assertEquals(false, deletionGroup2.isDeleted(100));
+  }
+
+  @Test
+  public void testIsDeleted2() {
+    DeletionGroup.IntervalCursor intervalCursor = new 
DeletionGroup.IntervalCursor();
+    assertEquals(false, deletionGroup1.isDeleted(1, intervalCursor));
+    assertEquals(false, deletionGroup1.isDeleted(5, intervalCursor));
+    assertEquals(true, deletionGroup1.isDeleted(10, intervalCursor));
+    assertEquals(true, deletionGroup1.isDeleted(20, intervalCursor));
+    assertEquals(true, deletionGroup1.isDeleted(40, intervalCursor));
+    assertEquals(false, deletionGroup1.isDeleted(45, intervalCursor));
+
+    assertEquals(true, deletionGroup1.isDeleted(50, intervalCursor));
+    assertEquals(false, deletionGroup1.isDeleted(55, intervalCursor));
+
+    assertEquals(true, deletionGroup1.isDeleted(70, intervalCursor));
+    assertEquals(true, deletionGroup1.isDeleted(100, intervalCursor));
+    assertEquals(true, deletionGroup1.isDeleted(120, intervalCursor));
+    assertEquals(false, deletionGroup1.isDeleted(125, intervalCursor));
+
+    assertEquals(true, deletionGroup1.isDeleted(300, intervalCursor));
+    assertEquals(false, deletionGroup1.isDeleted(301, intervalCursor));
+
+    intervalCursor = new DeletionGroup.IntervalCursor();
+    assertEquals(true, deletionGroup1.isDeleted(10, intervalCursor));
+    assertEquals(true, deletionGroup1.isDeleted(20, intervalCursor));
+    assertEquals(true, deletionGroup1.isDeleted(40, intervalCursor));
+    assertEquals(false, deletionGroup1.isDeleted(45, intervalCursor));
+
+    assertEquals(true, deletionGroup1.isDeleted(50, intervalCursor));
+    assertEquals(false, deletionGroup1.isDeleted(55, intervalCursor));
+
+    assertEquals(true, deletionGroup1.isDeleted(70, intervalCursor));
+    assertEquals(true, deletionGroup1.isDeleted(100, intervalCursor));
+    assertEquals(true, deletionGroup1.isDeleted(120, intervalCursor));
+    assertEquals(false, deletionGroup1.isDeleted(125, intervalCursor));
+
+    assertEquals(true, deletionGroup1.isDeleted(300, intervalCursor));
+    assertEquals(false, deletionGroup1.isDeleted(301, intervalCursor));
+
+    intervalCursor.reset();
+    assertEquals(true, deletionGroup1.isDeleted(50, intervalCursor));
+    assertEquals(false, deletionGroup1.isDeleted(55, intervalCursor));
+
+    assertEquals(true, deletionGroup1.isDeleted(70, intervalCursor));
+    assertEquals(true, deletionGroup1.isDeleted(100, intervalCursor));
+    assertEquals(true, deletionGroup1.isDeleted(120, intervalCursor));
+    assertEquals(false, deletionGroup1.isDeleted(125, intervalCursor));
+
+    assertEquals(true, deletionGroup1.isDeleted(300, intervalCursor));
+    assertEquals(false, deletionGroup1.isDeleted(301, intervalCursor));
+
+    intervalCursor.reset();
+    assertEquals(false, deletionGroup1.isDeleted(301, intervalCursor));
+
+    // == test empty deletionGroup2
+    intervalCursor = new DeletionGroup.IntervalCursor();
+    assertEquals(false, deletionGroup2.isDeleted(301, intervalCursor));
+  }
+}
diff --git 
a/server/src/test/java/org/apache/iotdb/db/sync/datasource/PipeOpManagerTest.java
 
b/server/src/test/java/org/apache/iotdb/db/sync/datasource/PipeOpManagerTest.java
index 26ad9e862a..ece3d7aeb4 100644
--- 
a/server/src/test/java/org/apache/iotdb/db/sync/datasource/PipeOpManagerTest.java
+++ 
b/server/src/test/java/org/apache/iotdb/db/sync/datasource/PipeOpManagerTest.java
@@ -19,6 +19,11 @@
 
 package org.apache.iotdb.db.sync.datasource;
 
+import org.apache.iotdb.commons.exception.IllegalPathException;
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.db.engine.modification.Deletion;
+import org.apache.iotdb.db.engine.modification.Modification;
+import org.apache.iotdb.db.engine.modification.ModificationFile;
 import org.apache.iotdb.db.sync.externalpipe.operation.InsertOperation;
 import org.apache.iotdb.db.sync.externalpipe.operation.Operation;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
@@ -37,32 +42,42 @@ import org.junit.Test;
 
 import java.io.File;
 import java.io.IOException;
+import java.util.LinkedList;
+import java.util.List;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
+import static org.junit.Assert.fail;
 
 public class PipeOpManagerTest {
   public static final String TMP_DIR = "target";
   private static final String seqTsFileName1 = TMP_DIR + File.separator + 
"test1.tsfile";
+  private final String seqModsFileName1 = seqTsFileName1 + ".mods";
   private static final String unSeqTsFileName1 = TMP_DIR + File.separator + 
"test2.unseq.tsfile";
+  private final String unSeqModsFileName1 = unSeqTsFileName1 + ".mods";
   public static final String DEFAULT_TEMPLATE = "template";
+  public final List<String> delFileList = new LinkedList<>();
 
   @Before
   public void prepareTestData() throws Exception {
     createSeqTsfile(seqTsFileName1);
+    delFileList.add(seqTsFileName1);
+    creatSeqModsFile(seqModsFileName1);
+    delFileList.add(seqModsFileName1);
+
     createUnSeqTsfile(unSeqTsFileName1);
+    delFileList.add(unSeqTsFileName1);
+    creatUnSeqModsFile(unSeqModsFileName1);
+    delFileList.add(unSeqModsFileName1);
   }
 
   @After
   public void removeTestData() throws Exception {
-    File file = new File(seqTsFileName1);
-    if (file.exists()) {
-      file.delete();
-    }
-
-    file = new File(unSeqTsFileName1);
-    if (file.exists()) {
-      file.delete();
+    for (String fileName : delFileList) {
+      File file = new File(fileName);
+      if (file.exists()) {
+        file.delete();
+      }
     }
   }
 
@@ -107,6 +122,14 @@ public class PipeOpManagerTest {
     tsFileWriter.write(tsRecord);
     tsFileWriter.flushAllChunkGroups(); // flush above data to disk at once
 
+    tsRecord = new TSRecord(1617206403004L, "root.lemming.device3");
+    dPoint1 = new FloatDataPoint("sensor1", 4.1f);
+    dPoint2 = new IntDataPoint("sensor2", 42);
+    tsRecord.addTuple(dPoint1);
+    tsRecord.addTuple(dPoint2);
+    tsFileWriter.write(tsRecord);
+    tsFileWriter.flushAllChunkGroups(); // flush above data to disk at once
+
     // close TsFile
     tsFileWriter.close();
   }
@@ -145,9 +168,9 @@ public class PipeOpManagerTest {
     tsFileWriter.flushAllChunkGroups(); // flush above data to disk at once
 
     tsRecord = new TSRecord(1617206403003L, "root2.lemming.device3");
-    dPoint1 = new FloatDataPoint("sensor1", 3.1f);
-    dPoint2 = new IntDataPoint("sensor2", 32);
-    dPoint3 = new IntDataPoint("sensor3", 33);
+    dPoint1 = new FloatDataPoint("sensor1", 33.1f);
+    dPoint2 = new IntDataPoint("sensor2", 332);
+    dPoint3 = new IntDataPoint("sensor3", 333);
     tsRecord.addTuple(dPoint1);
     tsRecord.addTuple(dPoint2);
     tsRecord.addTuple(dPoint3);
@@ -155,9 +178,9 @@ public class PipeOpManagerTest {
     tsFileWriter.flushAllChunkGroups(); // flush above data to disk at once
 
     tsRecord = new TSRecord(1617206403004L, "root2.lemming.device3");
-    dPoint1 = new FloatDataPoint("sensor1", 4.1f);
-    dPoint2 = new IntDataPoint("sensor2", 42);
-    dPoint3 = new IntDataPoint("sensor3", 43);
+    dPoint1 = new FloatDataPoint("sensor1", 44.1f);
+    dPoint2 = new IntDataPoint("sensor2", 442);
+    dPoint3 = new IntDataPoint("sensor3", 443);
     tsRecord.addTuple(dPoint1);
     tsRecord.addTuple(dPoint2);
     tsRecord.addTuple(dPoint3);
@@ -168,8 +191,45 @@ public class PipeOpManagerTest {
     tsFileWriter.close();
   }
 
+  private void creatSeqModsFile(String modsFilePath) throws 
IllegalPathException {
+    Modification[] modifications =
+        new Modification[] {
+          new Deletion(new PartialPath("root.lemming.device2.sensor2"), 2, 
1617206403002L),
+          new Deletion(
+              new PartialPath("root.lemming.device3.sensor1"), 3, 
1617206403003L, 1617206403009L),
+        };
+
+    try (ModificationFile mFile = new ModificationFile(modsFilePath)) {
+      for (Modification mod : modifications) {
+        mFile.write(mod);
+      }
+    } catch (IOException e) {
+      fail(e.getMessage());
+    } finally {;
+    }
+  }
+
+  private void creatUnSeqModsFile(String modsFilePath) throws 
IllegalPathException {
+    Modification[] modifications =
+        new Modification[] {
+          new Deletion(new PartialPath("root2.lemming.device1.sensor1"), 2, 
1617206403001L),
+          new Deletion(new PartialPath("root2.lemming.device2.*"), 3, 2, 
Long.MAX_VALUE),
+          new Deletion(
+              new PartialPath("root1.lemming.**"), 3, 2, Long.MAX_VALUE), // 
useless entry for root1
+        };
+
+    try (ModificationFile mFile = new ModificationFile(modsFilePath)) {
+      for (Modification mod : modifications) {
+        mFile.write(mod);
+      }
+    } catch (IOException e) {
+      fail(e.getMessage());
+    } finally {
+    }
+  }
+
   @Test(timeout = 10_000L)
-  public void testPipSrcManager() throws IOException {
+  public void testOpManager() throws IOException {
     PipeOpManager pipeOpManager = new PipeOpManager(null);
 
     String sgName1 = "root1";
@@ -181,7 +241,7 @@ public class PipeOpManagerTest {
     pipeOpManager.appendDataSrc(sgName2, tsFileOpBlock2);
 
     long count1 = tsFileOpBlock1.getDataCount();
-    assertEquals(6, count1);
+    assertEquals(8, count1);
     for (int i = 0; i < count1; i++) {
       Operation operation = pipeOpManager.getOperation(sgName1, i, 8);
       System.out.println("=== data" + i + ": " + operation + ", "); //
@@ -189,10 +249,13 @@ public class PipeOpManagerTest {
     }
 
     Operation operation = pipeOpManager.getOperation(sgName1, 0, 18);
+    InsertOperation insertOperation = (InsertOperation) operation;
     System.out.println("+++ data10" + ": " + operation + ", ");
+    assertEquals(
+        "root.lemming.device1.sensor1", 
insertOperation.getDataList().get(0).left.toString());
 
     pipeOpManager.commitData(sgName1, count1 - 1);
-    operation = pipeOpManager.getOperation(sgName1, 7, 18);
+    operation = pipeOpManager.getOperation(sgName1, 9, 18);
     System.out.println("+++ data11" + ": " + operation + ", ");
     assertNull(operation);
 
@@ -200,8 +263,133 @@ public class PipeOpManagerTest {
     System.out.println("+++ data12" + ": " + operation + ", ");
     assertEquals(4, operation.getDataCount());
 
-    InsertOperation insertOperation = (InsertOperation) operation;
+    insertOperation = (InsertOperation) operation;
+    assertEquals(
+        "root2.lemming.device3.sensor3", 
insertOperation.getDataList().get(0).left.toString());
     assertEquals(1617206403003L, 
insertOperation.getDataList().get(0).right.get(0).getTimestamp());
-    assertEquals("33", 
insertOperation.getDataList().get(0).right.get(0).getValue().toString());
+    assertEquals("333", 
insertOperation.getDataList().get(0).right.get(0).getValue().toString());
+  }
+
+  @Test // (timeout = 10_000L)
+  public void testOpManager_Mods() throws IOException {
+    PipeOpManager pipeOpManager = new PipeOpManager(null);
+
+    String sgName1 = "root1";
+    // String sgName2 = "root2";
+
+    TsFileOpBlock tsFileOpBlock1 = new TsFileOpBlock(sgName1, seqTsFileName1, 
seqModsFileName1, 1);
+    pipeOpManager.appendDataSrc(sgName1, tsFileOpBlock1);
+    TsFileOpBlock tsFileOpBlock2 =
+        new TsFileOpBlock(sgName1, unSeqTsFileName1, unSeqModsFileName1, 2);
+    pipeOpManager.appendDataSrc(sgName1, tsFileOpBlock2);
+
+    long count1 = tsFileOpBlock1.getDataCount();
+    assertEquals(8, count1);
+    for (int i = 0; i < 18; i++) {
+      Operation operation = pipeOpManager.getOperation(sgName1, i, 8);
+      assertEquals(sgName1, operation.getStorageGroup());
+    }
+
+    // == test batch data in TsFile1 + .mods
+    Operation operation = pipeOpManager.getOperation(sgName1, 0, 18);
+    assertEquals(8, operation.getDataCount());
+
+    InsertOperation insertOperation = (InsertOperation) operation;
+    int i = 0;
+    assertEquals(1617206403001L, 
insertOperation.getDataList().get(i).right.get(0).getTimestamp());
+    assertEquals("1.1", 
insertOperation.getDataList().get(i).right.get(0).getValue().toString());
+
+    i = 1;
+    assertEquals(1617206403001L, 
insertOperation.getDataList().get(i).right.get(0).getTimestamp());
+    assertEquals("12", 
insertOperation.getDataList().get(i).right.get(0).getValue().toString());
+
+    i = 2;
+    assertEquals(1617206403001L, 
insertOperation.getDataList().get(i).right.get(0).getTimestamp());
+    assertEquals("13", 
insertOperation.getDataList().get(i).right.get(0).getValue().toString());
+
+    i = 3;
+    assertEquals(1, insertOperation.getDataList().get(i).right.size());
+    assertNull(insertOperation.getDataList().get(i).right.get(0));
+
+    i = 4;
+    assertEquals(1, insertOperation.getDataList().get(i).right.size());
+    assertNull(insertOperation.getDataList().get(i).right.get(0));
+
+    i = 5;
+    assertEquals(1617206403003L, 
insertOperation.getDataList().get(i).right.get(0).getTimestamp());
+    assertEquals("32", 
insertOperation.getDataList().get(i).right.get(0).getValue().toString());
+
+    i = 6;
+    assertEquals(1, insertOperation.getDataList().get(i).right.size());
+    assertNull(insertOperation.getDataList().get(i).right.get(0));
+
+    i = 7;
+    assertEquals(1617206403004L, 
insertOperation.getDataList().get(i).right.get(0).getTimestamp());
+    assertEquals("42", 
insertOperation.getDataList().get(i).right.get(0).getValue().toString());
+
+    // == test batch data in TsFile2 + mods
+    operation = pipeOpManager.getOperation(sgName1, 8, 18);
+    assertEquals(10, operation.getDataCount());
+
+    insertOperation = (InsertOperation) operation;
+    i = 0;
+    assertEquals(
+        "root2.lemming.device1.sensor1", 
insertOperation.getDataList().get(i).left.toString());
+    assertEquals(1, insertOperation.getDataList().get(i).right.size());
+    assertNull(insertOperation.getDataList().get(i).right.get(0));
+
+    i = 1;
+    assertEquals(
+        "root2.lemming.device1.sensor2", 
insertOperation.getDataList().get(i).left.getFullPath());
+    assertEquals(1617206403001L, 
insertOperation.getDataList().get(i).right.get(0).getTimestamp());
+    assertEquals("12", 
insertOperation.getDataList().get(i).right.get(0).getValue().toString());
+
+    i = 2;
+    assertEquals(
+        "root2.lemming.device1.sensor3", 
insertOperation.getDataList().get(i).left.getFullPath());
+    assertEquals(1617206403001L, 
insertOperation.getDataList().get(i).right.get(0).getTimestamp());
+    assertEquals("13", 
insertOperation.getDataList().get(i).right.get(0).getValue().toString());
+
+    i = 3;
+    assertEquals(
+        "root2.lemming.device2.sensor2", 
insertOperation.getDataList().get(i).left.getFullPath());
+    assertEquals(1, insertOperation.getDataList().get(i).right.size());
+    assertNull(insertOperation.getDataList().get(i).right.get(0));
+
+    i = 4;
+    assertEquals(
+        "root2.lemming.device3.sensor1", 
insertOperation.getDataList().get(i).left.getFullPath());
+    assertEquals(1617206403003L, 
insertOperation.getDataList().get(i).right.get(0).getTimestamp());
+    assertEquals("33.1", 
insertOperation.getDataList().get(i).right.get(0).getValue().toString());
+
+    i = 5;
+    assertEquals(
+        "root2.lemming.device3.sensor2", 
insertOperation.getDataList().get(i).left.getFullPath());
+    assertEquals(1617206403003L, 
insertOperation.getDataList().get(i).right.get(0).getTimestamp());
+    assertEquals("332", 
insertOperation.getDataList().get(i).right.get(0).getValue().toString());
+
+    i = 6;
+    assertEquals(
+        "root2.lemming.device3.sensor3", 
insertOperation.getDataList().get(i).left.getFullPath());
+    assertEquals(1617206403003L, 
insertOperation.getDataList().get(i).right.get(0).getTimestamp());
+    assertEquals("333", 
insertOperation.getDataList().get(i).right.get(0).getValue().toString());
+
+    i = 7;
+    assertEquals(
+        "root2.lemming.device3.sensor1", 
insertOperation.getDataList().get(i).left.getFullPath());
+    assertEquals(1617206403004L, 
insertOperation.getDataList().get(i).right.get(0).getTimestamp());
+    assertEquals("44.1", 
insertOperation.getDataList().get(i).right.get(0).getValue().toString());
+
+    i = 8;
+    assertEquals(
+        "root2.lemming.device3.sensor2", 
insertOperation.getDataList().get(i).left.getFullPath());
+    assertEquals(1617206403004L, 
insertOperation.getDataList().get(i).right.get(0).getTimestamp());
+    assertEquals("442", 
insertOperation.getDataList().get(i).right.get(0).getValue().toString());
+
+    i = 9;
+    assertEquals(
+        "root2.lemming.device3.sensor3", 
insertOperation.getDataList().get(i).left.getFullPath());
+    assertEquals(1617206403004L, 
insertOperation.getDataList().get(i).right.get(0).getTimestamp());
+    assertEquals("443", 
insertOperation.getDataList().get(i).right.get(0).getValue().toString());
   }
 }
diff --git 
a/server/src/test/java/org/apache/iotdb/db/sync/datasource/TsFileOpBlockTest.java
 
b/server/src/test/java/org/apache/iotdb/db/sync/datasource/TsFileOpBlockTest.java
index 2c29effdc0..67571bd83d 100644
--- 
a/server/src/test/java/org/apache/iotdb/db/sync/datasource/TsFileOpBlockTest.java
+++ 
b/server/src/test/java/org/apache/iotdb/db/sync/datasource/TsFileOpBlockTest.java
@@ -19,6 +19,11 @@
 
 package org.apache.iotdb.db.sync.datasource;
 
+import org.apache.iotdb.commons.exception.IllegalPathException;
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.db.engine.modification.Deletion;
+import org.apache.iotdb.db.engine.modification.Modification;
+import org.apache.iotdb.db.engine.modification.ModificationFile;
 import org.apache.iotdb.db.sync.externalpipe.operation.InsertOperation;
 import org.apache.iotdb.db.sync.externalpipe.operation.Operation;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
@@ -37,29 +42,53 @@ import org.junit.Test;
 
 import java.io.File;
 import java.io.IOException;
+import java.util.LinkedList;
+import java.util.List;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.fail;
 
 public class TsFileOpBlockTest {
 
-  public static final String TMP_DIR = "target";
-  private static final String tsFileName1 = TMP_DIR + File.separator + 
"test1.tsfile";
-  public static final String DEFAULT_TEMPLATE = "template";
+  public final String TMP_DIR = "target";
+  private final String tsFileName1 = TMP_DIR + File.separator + "test1.tsfile";
+  private final String tsFileName2 = TMP_DIR + File.separator + "test2.tsfile";
+  private final String modsFileName2 = tsFileName2 + ".mods";
+  private final String tsFileName3 = TMP_DIR + File.separator + "test3.tsfile";
+  private final String modsFileName3 = tsFileName3 + ".mods";
+  public final List<String> fileNameList = new LinkedList<>();
+
+  public final String DEFAULT_TEMPLATE = "template";
 
   @Before
   public void prepareTestData() throws Exception {
-    createTsfile(tsFileName1);
+    createTsfile1(tsFileName1);
+    fileNameList.add(tsFileName1);
+
+    createTsfile2(tsFileName2);
+    fileNameList.add(tsFileName2);
+    creatModsFile2(modsFileName2);
+    fileNameList.add(modsFileName2);
+
+    createTsfile2(tsFileName3);
+    fileNameList.add(tsFileName3);
+    creatModsFile3(modsFileName3);
+    fileNameList.add(modsFileName3);
   }
 
   @After
   public void removeTestData() throws Exception {
-    File file = new File(tsFileName1);
-    if (file.exists()) {
-      file.delete();
+    for (String fileName : fileNameList) {
+      File file = new File(fileName);
+      if (file.exists()) {
+        file.delete();
+      }
     }
   }
 
-  private void createTsfile(String tsfilePath) throws Exception {
+  private void createTsfile1(String tsfilePath) throws Exception {
     File file = new File(tsfilePath);
     if (file.exists()) {
       file.delete();
@@ -105,7 +134,7 @@ public class TsFileOpBlockTest {
   }
 
   @Test(timeout = 10_000L)
-  public void testSingleReadEntry() throws IOException {
+  public void testOpBlock() throws IOException {
     TsFileOpBlock tsFileOpBlock = new TsFileOpBlock("root", tsFileName1, 0);
 
     assertEquals("root", tsFileOpBlock.getStorageGroup());
@@ -132,10 +161,12 @@ public class TsFileOpBlockTest {
     }
 
     InsertOperation insertOperation = (InsertOperation) operation;
+
+    int k = 0;
     assertEquals(
-        "root.lemming.device3.sensor2", 
insertOperation.getDataList().get(0).left.getFullPath());
-    assertEquals(1617206403003L, 
insertOperation.getDataList().get(0).right.get(0).getTimestamp());
-    assertEquals("32", 
insertOperation.getDataList().get(0).right.get(0).getValue().toString());
+        "root.lemming.device3.sensor2", 
insertOperation.getDataList().get(k).left.getFullPath());
+    assertEquals(1617206403003L, 
insertOperation.getDataList().get(k).right.get(0).getTimestamp());
+    assertEquals("32", 
insertOperation.getDataList().get(k).right.get(0).getValue().toString());
 
     for (int i = 0; i <= tsFileOpBlock.getDataCount() - 3; i++) {
       operation = tsFileOpBlock.getOperation(i + 2, 3);
@@ -157,4 +188,321 @@ public class TsFileOpBlockTest {
 
     tsFileOpBlock.close();
   }
+
+  // == test TsFile + .mods
+
+  private void createTsfile2(String tsfilePath) throws Exception {
+    File file = new File(tsfilePath);
+    if (file.exists()) {
+      file.delete();
+    }
+
+    Schema schema = new Schema();
+    schema.extendTemplate(
+        DEFAULT_TEMPLATE, new MeasurementSchema("sensor1", TSDataType.FLOAT, 
TSEncoding.RLE));
+    schema.extendTemplate(
+        DEFAULT_TEMPLATE, new MeasurementSchema("sensor2", TSDataType.INT32, 
TSEncoding.TS_2DIFF));
+    schema.extendTemplate(
+        DEFAULT_TEMPLATE, new MeasurementSchema("sensor3", TSDataType.INT32, 
TSEncoding.TS_2DIFF));
+
+    TsFileWriter tsFileWriter = new TsFileWriter(file, schema);
+
+    // construct TSRecord
+    TSRecord tsRecord = new TSRecord(1617206403001L, "root.lemming.device1");
+    DataPoint dPoint1 = new FloatDataPoint("sensor1", 1.1f);
+    DataPoint dPoint2 = new IntDataPoint("sensor2", 12);
+    DataPoint dPoint3 = new IntDataPoint("sensor3", 13);
+    tsRecord.addTuple(dPoint1);
+    tsRecord.addTuple(dPoint2);
+    tsRecord.addTuple(dPoint3);
+    tsFileWriter.write(tsRecord);
+    tsFileWriter.flushAllChunkGroups(); // flush above data to disk at once
+
+    tsRecord = new TSRecord(1617206403002L, "root.lemming.device2");
+    dPoint2 = new IntDataPoint("sensor2", 22);
+    tsRecord.addTuple(dPoint2);
+    tsFileWriter.write(tsRecord);
+    tsFileWriter.flushAllChunkGroups(); // flush above data to disk at once
+
+    tsRecord = new TSRecord(1617206403003L, "root.lemming.device3");
+    dPoint1 = new FloatDataPoint("sensor1", 3.1f);
+    dPoint2 = new IntDataPoint("sensor2", 32);
+    tsRecord.addTuple(dPoint1);
+    tsRecord.addTuple(dPoint2);
+    tsFileWriter.write(tsRecord);
+    tsFileWriter.flushAllChunkGroups(); // flush above data to disk at once
+
+    tsRecord = new TSRecord(1617206403004L, "root.lemming.device1");
+    dPoint1 = new FloatDataPoint("sensor1", 4.1f);
+    dPoint2 = new IntDataPoint("sensor2", 42);
+    dPoint3 = new IntDataPoint("sensor3", 43);
+    tsRecord.addTuple(dPoint1);
+    tsRecord.addTuple(dPoint2);
+    tsRecord.addTuple(dPoint3);
+    tsFileWriter.write(tsRecord);
+    tsFileWriter.flushAllChunkGroups(); // flush above data to disk at once
+
+    // close TsFile
+    tsFileWriter.close();
+  }
+
+  private void creatModsFile2(String modsFilePath) throws IllegalPathException 
{
+    Modification[] modifications =
+        new Modification[] {
+          // new Deletion(new PartialPath(new String[] {"d1", "s2"}), 1, 2),
+          new Deletion(new PartialPath("root.lemming.device1.sensor1"), 2, 1),
+          new Deletion(new PartialPath("root.lemming.device1.sensor1"), 3, 2, 
5),
+          new Deletion(new PartialPath("root.lemming.**"), 11, 1, 
Long.MAX_VALUE)
+        };
+
+    try (ModificationFile mFile = new ModificationFile(modsFilePath)) {
+      for (Modification mod : modifications) {
+        mFile.write(mod);
+      }
+    } catch (IOException e) {
+      fail(e.getMessage());
+    } finally {;
+    }
+  }
+
+  private void creatModsFile3(String modsFilePath) throws IllegalPathException 
{
+    Modification[] modifications =
+        new Modification[] {
+          new Deletion(new PartialPath("root.lemming.device1.sensor1"), 2, 
1617206403001L),
+          new Deletion(new PartialPath("root.lemming.device2.*"), 3, 2, 
Long.MAX_VALUE),
+        };
+
+    try (ModificationFile mFile = new ModificationFile(modsFilePath)) {
+      for (Modification mod : modifications) {
+        mFile.write(mod);
+      }
+    } catch (IOException e) {
+      fail(e.getMessage());
+    } finally {;
+    }
+  }
+
+  @Test(timeout = 10_000L)
+  public void testOpBlockMods2() throws IOException {
+
+    List<Modification> modificationList = null;
+    try (ModificationFile mFile = new ModificationFile(modsFileName2)) {
+      modificationList = (List<Modification>) mFile.getModifications();
+    }
+    // System.out.println("=== data: " + modificationList);
+
+    TsFileOpBlock tsFileOpBlock = new TsFileOpBlock("root", tsFileName2, 
modsFileName2, 0);
+
+    assertEquals("root", tsFileOpBlock.getStorageGroup());
+    assertEquals(0, tsFileOpBlock.getBeginIndex());
+    assertEquals(9, tsFileOpBlock.getDataCount());
+    assertEquals(9, tsFileOpBlock.getNextIndex());
+
+    // == check setBeginIndex()
+    tsFileOpBlock.setBeginIndex(55);
+    assertEquals(64, tsFileOpBlock.getNextIndex());
+
+    // == check result before and after calling tsFileOpBlock.getOperation()
+    assertNull(tsFileOpBlock.getFullPathToDeletionMap());
+    assertNull(tsFileOpBlock.getModificationList());
+    Operation operation = tsFileOpBlock.getOperation(55, 1);
+    ;
+    assertNotNull(tsFileOpBlock.getFullPathToDeletionMap());
+    assertEquals(modificationList, tsFileOpBlock.getModificationList());
+    assertEquals(9, tsFileOpBlock.getDataCount());
+
+    // == check tsFileOpBlock.getOperation()
+    for (int i = 0; i < tsFileOpBlock.getDataCount(); i++) {
+      operation = tsFileOpBlock.getOperation(i + 55, 1);
+      assertEquals("root", operation.getStorageGroup());
+      assertEquals(1, operation.getDataCount());
+      assertEquals(i + 55, operation.getStartIndex());
+      assertEquals(i + 56, operation.getEndIndex());
+
+      assertEquals(true, operation instanceof InsertOperation);
+      InsertOperation insertOperation = (InsertOperation) operation;
+      assertEquals(1, insertOperation.getDataList().size());
+      // System.out.println("=== data" + i + ": " + operation + 
((InsertOperation)
+      // operation).getDataList());
+    }
+
+    // == check deleted data caused by .mods file
+    operation = tsFileOpBlock.getOperation(55, 15);
+    assertEquals(9, operation.getDataCount());
+    InsertOperation insertOperation = (InsertOperation) operation;
+
+    int i = 0;
+    assertEquals(
+        "root.lemming.device1.sensor1", 
insertOperation.getDataList().get(i).left.getFullPath());
+    assertEquals(null, insertOperation.getDataList().get(i).right.get(0));
+
+    i = 1;
+    assertEquals(
+        "root.lemming.device1.sensor2", 
insertOperation.getDataList().get(i).left.getFullPath());
+    assertEquals(null, insertOperation.getDataList().get(i).right.get(0));
+
+    i = 2;
+    assertEquals(
+        "root.lemming.device1.sensor3", 
insertOperation.getDataList().get(i).left.getFullPath());
+    assertEquals(null, insertOperation.getDataList().get(i).right.get(0));
+
+    i = 3;
+    assertEquals(
+        "root.lemming.device2.sensor2", 
insertOperation.getDataList().get(i).left.getFullPath());
+    assertEquals(null, insertOperation.getDataList().get(i).right.get(0));
+
+    i = 4;
+    assertEquals(
+        "root.lemming.device3.sensor1", 
insertOperation.getDataList().get(i).left.getFullPath());
+    assertEquals(null, insertOperation.getDataList().get(i).right.get(0));
+
+    i = 5;
+    assertEquals(
+        "root.lemming.device3.sensor2", 
insertOperation.getDataList().get(i).left.getFullPath());
+    assertEquals(null, insertOperation.getDataList().get(i).right.get(0));
+
+    // assertEquals(1617206403003L,
+    // insertOperation.getDataList().get(i).right.get(0).getTimestamp());
+    // assertEquals("32", 
insertOperation.getDataList().get(i).right.get(0).getValue().toString());
+
+    tsFileOpBlock.close();
+  }
+
+  @Test(timeout = 10_000L)
+  public void testOpBlockMods3() throws IOException {
+
+    List<Modification> modificationList = null;
+    try (ModificationFile mFile = new ModificationFile(modsFileName3)) {
+      modificationList = (List<Modification>) mFile.getModifications();
+    }
+
+    TsFileOpBlock tsFileOpBlock = new TsFileOpBlock("root", tsFileName2, 
modsFileName3, 0);
+
+    assertEquals("root", tsFileOpBlock.getStorageGroup());
+    assertEquals(0, tsFileOpBlock.getBeginIndex());
+    assertEquals(9, tsFileOpBlock.getDataCount());
+    assertEquals(9, tsFileOpBlock.getNextIndex());
+
+    // == check setBeginIndex()
+    tsFileOpBlock.setBeginIndex(55);
+    assertEquals(64, tsFileOpBlock.getNextIndex());
+
+    // == check result before and after calling tsFileOpBlock.getOperation()
+    assertNull(tsFileOpBlock.getFullPathToDeletionMap());
+    assertNull(tsFileOpBlock.getModificationList());
+    Operation operation = tsFileOpBlock.getOperation(55, 1);
+
+    assertNotNull(tsFileOpBlock.getFullPathToDeletionMap());
+    assertEquals(modificationList, tsFileOpBlock.getModificationList());
+    assertEquals(9, tsFileOpBlock.getDataCount());
+
+    // == check tsFileOpBlock.getOperation()
+    for (int i = 0; i < tsFileOpBlock.getDataCount(); i++) {
+      operation = tsFileOpBlock.getOperation(i + 55, 1);
+      assertEquals("root", operation.getStorageGroup());
+      assertEquals(1, operation.getDataCount());
+      assertEquals(i + 55, operation.getStartIndex());
+      assertEquals(i + 56, operation.getEndIndex());
+
+      assertEquals(true, operation instanceof InsertOperation);
+      InsertOperation insertOperation = (InsertOperation) operation;
+      assertEquals(1, insertOperation.getDataList().size());
+      // System.out.println("=== data" + i + ": " + operation + 
((InsertOperation)
+      // operation).getDataList());
+    }
+
+    // == check deleted data caused by .mods file
+    operation = tsFileOpBlock.getOperation(55, 20);
+    assertEquals(9, operation.getDataCount());
+    InsertOperation insertOperation = (InsertOperation) operation;
+
+    int i = 0;
+    assertEquals(
+        "root.lemming.device1.sensor1", 
insertOperation.getDataList().get(i).left.getFullPath());
+    assertEquals(null, insertOperation.getDataList().get(i).right.get(0));
+
+    i = 1;
+    assertEquals(
+        "root.lemming.device1.sensor2", 
insertOperation.getDataList().get(i).left.getFullPath());
+    assertEquals(1617206403001L, 
insertOperation.getDataList().get(i).right.get(0).getTimestamp());
+    assertEquals("12", 
insertOperation.getDataList().get(i).right.get(0).getValue().toString());
+
+    i = 2;
+    assertEquals(
+        "root.lemming.device1.sensor3", 
insertOperation.getDataList().get(i).left.getFullPath());
+    assertEquals(1617206403001L, 
insertOperation.getDataList().get(i).right.get(0).getTimestamp());
+    assertEquals("13", 
insertOperation.getDataList().get(i).right.get(0).getValue().toString());
+
+    i = 3;
+    assertEquals(
+        "root.lemming.device2.sensor2", 
insertOperation.getDataList().get(i).left.getFullPath());
+    assertEquals(null, insertOperation.getDataList().get(i).right.get(0));
+
+    i = 4;
+    assertEquals(
+        "root.lemming.device3.sensor1", 
insertOperation.getDataList().get(i).left.getFullPath());
+    assertEquals(1617206403003L, 
insertOperation.getDataList().get(i).right.get(0).getTimestamp());
+    assertEquals("3.1", 
insertOperation.getDataList().get(i).right.get(0).getValue().toString());
+
+    i = 5;
+    assertEquals(
+        "root.lemming.device3.sensor2", 
insertOperation.getDataList().get(i).left.getFullPath());
+    assertEquals(1617206403003L, 
insertOperation.getDataList().get(i).right.get(0).getTimestamp());
+    assertEquals("32", 
insertOperation.getDataList().get(i).right.get(0).getValue().toString());
+
+    i = 6;
+    assertEquals(
+        "root.lemming.device1.sensor1", 
insertOperation.getDataList().get(i).left.getFullPath());
+    assertEquals(1617206403004L, 
insertOperation.getDataList().get(i).right.get(0).getTimestamp());
+    assertEquals("4.1", 
insertOperation.getDataList().get(i).right.get(0).getValue().toString());
+
+    i = 7;
+    assertEquals(
+        "root.lemming.device1.sensor2", 
insertOperation.getDataList().get(i).left.getFullPath());
+    assertEquals(1617206403004L, 
insertOperation.getDataList().get(i).right.get(0).getTimestamp());
+    assertEquals("42", 
insertOperation.getDataList().get(i).right.get(0).getValue().toString());
+
+    i = 8;
+    assertEquals(
+        "root.lemming.device1.sensor3", 
insertOperation.getDataList().get(i).left.getFullPath());
+    assertEquals(1617206403004L, 
insertOperation.getDataList().get(i).right.get(0).getTimestamp());
+    assertEquals("43", 
insertOperation.getDataList().get(i).right.get(0).getValue().toString());
+
+    // == test getting old data and page cache
+    operation = tsFileOpBlock.getOperation(59, 20);
+    assertEquals(5, operation.getDataCount());
+    insertOperation = (InsertOperation) operation;
+
+    i = 0;
+    assertEquals(
+        "root.lemming.device3.sensor1", 
insertOperation.getDataList().get(i).left.getFullPath());
+    assertEquals(1617206403003L, 
insertOperation.getDataList().get(i).right.get(0).getTimestamp());
+    assertEquals("3.1", 
insertOperation.getDataList().get(i).right.get(0).getValue().toString());
+
+    i = 1;
+    assertEquals(
+        "root.lemming.device3.sensor2", 
insertOperation.getDataList().get(i).left.getFullPath());
+    assertEquals(1617206403003L, 
insertOperation.getDataList().get(i).right.get(0).getTimestamp());
+    assertEquals("32", 
insertOperation.getDataList().get(i).right.get(0).getValue().toString());
+
+    i = 2;
+    assertEquals(
+        "root.lemming.device1.sensor1", 
insertOperation.getDataList().get(i).left.getFullPath());
+    assertEquals(1617206403004L, 
insertOperation.getDataList().get(i).right.get(0).getTimestamp());
+    assertEquals("4.1", 
insertOperation.getDataList().get(i).right.get(0).getValue().toString());
+
+    i = 3;
+    assertEquals(
+        "root.lemming.device1.sensor2", 
insertOperation.getDataList().get(i).left.getFullPath());
+    assertEquals(1617206403004L, 
insertOperation.getDataList().get(i).right.get(0).getTimestamp());
+    assertEquals("42", 
insertOperation.getDataList().get(i).right.get(0).getValue().toString());
+
+    i = 4;
+    assertEquals(
+        "root.lemming.device1.sensor3", 
insertOperation.getDataList().get(i).left.getFullPath());
+    assertEquals(1617206403004L, 
insertOperation.getDataList().get(i).right.get(0).getTimestamp());
+    assertEquals("43", 
insertOperation.getDataList().get(i).right.get(0).getValue().toString());
+
+    tsFileOpBlock.close();
+  }
 }

Reply via email to