This is an automated email from the ASF dual-hosted git repository.

sunzesong pushed a commit to branch pr_1758
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 6452379f5c1f0e5c7e14325e46a4324007519115
Author: 张凌哲 <[email protected]>
AuthorDate: Sun Oct 4 17:42:20 2020 +0800

    add unseq merge config and logic with instance unseq merge
---
 .../resources/conf/iotdb-engine.properties         |  16 +-
 .../java/org/apache/iotdb/db/conf/IoTDBConfig.java |  35 +-
 .../org/apache/iotdb/db/conf/IoTDBDescriptor.java  |  10 +
 .../engine/storagegroup/StorageGroupProcessor.java | 426 +++++----------------
 .../engine/tsfilemanagement/TsFileManagement.java  | 230 +++++++++++
 .../level/LevelTsFileManagement.java               | 230 +++++------
 .../iotdb/db/engine/merge/MergeManagerTest.java    |   4 +-
 .../storagegroup/StorageGroupProcessorTest.java    | 327 +---------------
 .../iotdb/db/integration/IoTDBMergeTest.java       |   2 +-
 9 files changed, 527 insertions(+), 753 deletions(-)

diff --git a/server/src/assembly/resources/conf/iotdb-engine.properties 
b/server/src/assembly/resources/conf/iotdb-engine.properties
index 2e6577b..97ded95 100644
--- a/server/src/assembly/resources/conf/iotdb-engine.properties
+++ b/server/src/assembly/resources/conf/iotdb-engine.properties
@@ -260,12 +260,20 @@ default_fill_interval=-1
 tsfile_manage_strategy=NORMAL_STRATEGY
 
 # Work when tsfile_manage_strategy is level_strategy.
-# The max file num of each level. When file num exceeds this, the files in one 
level will merge to one.
-max_file_num_in_each_level=100
+# The max seq file num of each level. When file num exceeds this, the files in 
one level will merge to one.
+max_file_num_in_each_level=10
 
 # Work when tsfile_manage_strategy is level_strategy.
-# The max num of level.
-max_level_num=2
+# The max num of seq level.
+max_level_num=4
+
+# Work when tsfile_manage_strategy is level_strategy.
+# The max unseq file num of each level. When file num exceeds this, the files 
in one level will merge to one.
+max_unseq_file_num_in_each_level=10
+
+# Work when tsfile_manage_strategy is level_strategy.
+# The max num of unseq level.
+max_unseq_level_num=2
 
 # Work when tsfile_manage_strategy is level_strategy.
 # When merge point number reaches this, merge the files to the last level.
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java 
b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 54cf9e3..5322c07 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -266,15 +266,26 @@ public class IoTDBConfig {
   private TsFileManagementStrategy tsFileManagementStrategy = 
TsFileManagementStrategy.NORMAL_STRATEGY;
 
   /**
-   * Work when tsfile_manage_strategy is level_strategy. The max file num of 
each level. When file
+   * Work when tsfile_manage_strategy is level_strategy. The max seq file num 
of each level. When file
    * num exceeds this, the files in one level will merge to one.
    */
-  private int maxFileNumInEachLevel = 100;
+  private int maxFileNumInEachLevel = 10;
 
   /**
-   * Work when tsfile_manage_strategy is level_strategy. The max num of level.
+   * Work when tsfile_manage_strategy is level_strategy. The max num of seq 
level.
    */
-  private int maxLevelNum = 2;
+  private int maxLevelNum = 4;
+
+  /**
+   * Work when tsfile_manage_strategy is level_strategy. The max unseq file 
num of each level. When file
+   * num exceeds this, the files in one level will merge to one.
+   */
+  private int maxUnseqFileNumInEachLevel = 10;
+
+  /**
+   * Work when tsfile_manage_strategy is level_strategy. The max num of unseq 
level.
+   */
+  private int maxUnseqLevelNum = 2;
 
   /**
    * whether to cache meta data(ChunkMetaData and TsFileMetaData) or not.
@@ -1308,6 +1319,22 @@ public class IoTDBConfig {
     this.maxLevelNum = maxLevelNum;
   }
 
+  public int getMaxUnseqFileNumInEachLevel() {
+    return maxUnseqFileNumInEachLevel;
+  }
+
+  public void setMaxUnseqFileNumInEachLevel(int maxUnseqFileNumInEachLevel) {
+    this.maxUnseqFileNumInEachLevel = maxUnseqFileNumInEachLevel;
+  }
+
+  public int getMaxUnseqLevelNum() {
+    return maxUnseqLevelNum;
+  }
+
+  public void setMaxUnseqLevelNum(int maxUnseqLevelNum) {
+    this.maxUnseqLevelNum = maxUnseqLevelNum;
+  }
+
   public int getMergeChunkSubThreadNum() {
     return mergeChunkSubThreadNum;
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java 
b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index fbeccfd..b77f4d0 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -287,6 +287,14 @@ public class IoTDBDescriptor {
           .getProperty("max_file_num_in_each_level",
               Integer.toString(conf.getMaxFileNumInEachLevel()))));
 
+      conf.setMaxUnseqLevelNum(Integer.parseInt(properties
+          .getProperty("max_unseq_level_num",
+              Integer.toString(conf.getMaxUnseqLevelNum()))));
+
+      conf.setMaxUnseqFileNumInEachLevel(Integer.parseInt(properties
+          .getProperty("max_unseq_file_num_in_each_level",
+              Integer.toString(conf.getMaxUnseqFileNumInEachLevel()))));
+
       conf.setSyncEnable(Boolean
           .parseBoolean(properties.getProperty("is_sync_enable",
               Boolean.toString(conf.isSyncEnable()))));
@@ -357,6 +365,8 @@ public class IoTDBDescriptor {
           Boolean.toString(conf.isForceFullMerge()))));
       conf.setChunkMergePointThreshold(Integer.parseInt(properties.getProperty(
           "chunk_merge_point_threshold", 
Integer.toString(conf.getChunkMergePointThreshold()))));
+      conf.setMergeThroughputMbPerSec(Integer.parseInt(properties.getProperty(
+          "merge_throughput_mb_per_sec", 
Integer.toString(conf.getMergeThroughputMbPerSec()))));
 
       conf.setEnablePartialInsert(
           Boolean.parseBoolean(properties.getProperty("enable_partial_insert",
diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index bf09d1b..5038d00 100755
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -49,15 +49,8 @@ import org.apache.iotdb.db.engine.StorageEngine;
 import org.apache.iotdb.db.engine.fileSystem.SystemFileFactory;
 import org.apache.iotdb.db.engine.flush.TsFileFlushPolicy;
 import org.apache.iotdb.db.engine.merge.manage.MergeManager;
-import org.apache.iotdb.db.engine.merge.manage.MergeResource;
-import org.apache.iotdb.db.engine.merge.selector.IMergeFileSelector;
-import org.apache.iotdb.db.engine.merge.selector.MaxFileMergeFileSelector;
-import org.apache.iotdb.db.engine.merge.selector.MaxSeriesMergeFileSelector;
-import org.apache.iotdb.db.engine.merge.selector.MergeFileStrategy;
-import org.apache.iotdb.db.engine.merge.task.MergeTask;
 import org.apache.iotdb.db.engine.merge.task.RecoverMergeTask;
 import org.apache.iotdb.db.engine.modification.Deletion;
-import org.apache.iotdb.db.engine.modification.Modification;
 import org.apache.iotdb.db.engine.modification.ModificationFile;
 import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
 import 
org.apache.iotdb.db.engine.tsfilemanagement.HotCompactionMergeTaskPoolManager;
@@ -67,7 +60,6 @@ import org.apache.iotdb.db.engine.version.VersionController;
 import org.apache.iotdb.db.exception.BatchInsertionException;
 import org.apache.iotdb.db.exception.DiskSpaceInsufficientException;
 import org.apache.iotdb.db.exception.LoadFileException;
-import org.apache.iotdb.db.exception.MergeException;
 import org.apache.iotdb.db.exception.StorageGroupProcessorException;
 import org.apache.iotdb.db.exception.TsFileProcessorException;
 import org.apache.iotdb.db.exception.WriteProcessException;
@@ -123,8 +115,7 @@ import org.slf4j.LoggerFactory;
  */
 public class StorageGroupProcessor {
 
-  private static final String MERGING_MODIFICATION_FILE_NAME = "merge.mods";
-  private static final String FAIL_TO_UPGRADE_FOLDER = "Failed to move {} to 
upgrade folder";
+  public static final String MERGING_MODIFICATION_FILE_NAME = "merge.mods";
 
   /**
    * All newly generated chunks after merge have version number 0, so we set 
merged Modification
@@ -154,6 +145,11 @@ public class StorageGroupProcessor {
    */
   private final Object closeStorageGroupCondition = new Object();
   /**
+   * hotCompactionMergeWorking is used to wait for last hot compaction to be 
done.
+   */
+  private volatile boolean hotCompactionMergeWorking = false;
+
+  /**
    * avoid some tsfileResource is changed (e.g., from unsealed to sealed) when 
a query is executed.
    */
   private final ReadWriteLock closeQueryLock = new ReentrantReadWriteLock();
@@ -165,10 +161,7 @@ public class StorageGroupProcessor {
    * time partition id in the storage group -> tsFileProcessor for this time 
partition
    */
   private final TreeMap<Long, TsFileProcessor> workUnsequenceTsFileProcessors 
= new TreeMap<>();
-  /**
-   * hotCompactionMergeWorking is used to wait for last hot compaction to be 
done.
-   */
-  private volatile boolean hotCompactionMergeWorking = false;
+
   // upgrading sequence TsFile resource list
   private List<TsFileResource> upgradeSeqFileList = new LinkedList<>();
 
@@ -208,7 +201,7 @@ public class StorageGroupProcessor {
   private File storageGroupSysDir;
 
   // manage seqFileList and unSeqFileList
-  private TsFileManagement tsFileManagement;
+  public TsFileManagement tsFileManagement;
 
   /**
    * time partition id -> version controller which assigns a version for each 
MemTable and
@@ -216,18 +209,7 @@ public class StorageGroupProcessor {
    * updates can be re-determined.
    */
   private HashMap<Long, VersionController> timePartitionIdVersionControllerMap 
= new HashMap<>();
-  /**
-   * mergeLock is to be used in the merge process. Concurrent queries, 
deletions and merges may
-   * result in losing some deletion in the merged new file, so a lock is 
necessary.
-   */
-  private ReentrantReadWriteLock mergeLock = new ReentrantReadWriteLock();
-  /**
-   * This is the modification file of the result of the current merge. Because 
the merged file may
-   * be invisible at this moment, without this, deletion/update during merge 
could be lost.
-   */
-  private ModificationFile mergingModification;
-  private volatile boolean isMerging = false;
-  private long mergeStartTime;
+
   /**
    * when the data in a storage group is older than dataTTL, it is considered 
invalid and will be
    * eventually removed.
@@ -329,11 +311,11 @@ public class StorageGroupProcessor {
       File mergingMods = SystemFileFactory.INSTANCE.getFile(storageGroupSysDir,
           MERGING_MODIFICATION_FILE_NAME);
       if (mergingMods.exists()) {
-        mergingModification = new ModificationFile(mergingMods.getPath());
+        this.tsFileManagement.mergingModification = new 
ModificationFile(mergingMods.getPath());
       }
       RecoverMergeTask recoverMergeTask = new RecoverMergeTask(
           new ArrayList<>(tsFileManagement.getTsFileList(true)),
-          tsFileManagement.getTsFileList(false), storageGroupSysDir.getPath(), 
this::mergeEndAction,
+          tsFileManagement.getTsFileList(false), storageGroupSysDir.getPath(), 
tsFileManagement::mergeEndAction,
           taskName,
           IoTDBDescriptor.getInstance().getConfig().isForceFullMerge(), 
storageGroupName);
       logger.info("{} a RecoverMergeTask {} starts...", storageGroupName, 
taskName);
@@ -476,19 +458,19 @@ public class StorageGroupProcessor {
         // move .tsfile to upgrade folder
         for (File file : oldTsfileArray) {
           if (!file.renameTo(fsFactory.getFile(upgradeFolder, 
file.getName()))) {
-            logger.error(FAIL_TO_UPGRADE_FOLDER, file);
+            logger.error("Failed to move {} to upgrade folder", file);
           }
         }
         // move .resource to upgrade folder
         for (File file : oldResourceFileArray) {
           if (!file.renameTo(fsFactory.getFile(upgradeFolder, 
file.getName()))) {
-            logger.error(FAIL_TO_UPGRADE_FOLDER, file);
+            logger.error("Failed to move {} to upgrade folder", file);
           }
         }
         // move .mods to upgrade folder
         for (File file : oldModificationFileArray) {
           if (!file.renameTo(fsFactory.getFile(upgradeFolder, 
file.getName()))) {
-            logger.error(FAIL_TO_UPGRADE_FOLDER, file);
+            logger.error("Failed to move {} to upgrade folder", file);
           }
         }
 
@@ -624,22 +606,14 @@ public class StorageGroupProcessor {
       // init map
       long timePartitionId = 
StorageEngine.getTimePartition(insertRowPlan.getTime());
 
+      latestTimeForEachDevice.computeIfAbsent(timePartitionId, l -> new 
HashMap<>());
       partitionLatestFlushedTimeForEachDevice
           .computeIfAbsent(timePartitionId, id -> new HashMap<>());
 
-      boolean isSequence =
-          insertRowPlan.getTime() > 
partitionLatestFlushedTimeForEachDevice.get(timePartitionId)
-              .getOrDefault(insertRowPlan.getDeviceId().getFullPath(), 
Long.MIN_VALUE);
-
-      //is unsequence and user set config to discard out of order data
-      if (!isSequence && IoTDBDescriptor.getInstance().getConfig()
-          .isEnableDiscardOutOfOrderData()) {
-        return;
-      }
-
-      latestTimeForEachDevice.computeIfAbsent(timePartitionId, l -> new 
HashMap<>());
       // insert to sequence or unSequence file
-      insertToTsFileProcessor(insertRowPlan, isSequence);
+      insertToTsFileProcessor(insertRowPlan,
+          insertRowPlan.getTime() > 
partitionLatestFlushedTimeForEachDevice.get(timePartitionId)
+              .getOrDefault(insertRowPlan.getDeviceId().getFullPath(), 
Long.MIN_VALUE));
 
     } finally {
       writeUnlock();
@@ -696,12 +670,9 @@ public class StorageGroupProcessor {
         // start next partition
         if (curTimePartition != beforeTimePartition) {
           // insert last time partition
-          if (isSequence || !IoTDBDescriptor.getInstance().getConfig()
-              .isEnableDiscardOutOfOrderData()) {
-            noFailure = insertTabletToTsFileProcessor(insertTabletPlan, 
before, loc, isSequence,
-                results,
-                beforeTimePartition) && noFailure;
-          }
+          noFailure = insertTabletToTsFileProcessor(insertTabletPlan, before, 
loc, isSequence,
+              results,
+              beforeTimePartition) && noFailure;
           // re initialize
           before = loc;
           beforeTimePartition = curTimePartition;
@@ -715,11 +686,8 @@ public class StorageGroupProcessor {
           // judge if we should insert sequence
           if (!isSequence && time > lastFlushTime) {
             // insert into unsequence and then start sequence
-            if 
(!IoTDBDescriptor.getInstance().getConfig().isEnableDiscardOutOfOrderData()) {
-              noFailure =
-                  insertTabletToTsFileProcessor(insertTabletPlan, before, loc, 
false, results,
-                      beforeTimePartition) && noFailure;
-            }
+            noFailure = insertTabletToTsFileProcessor(insertTabletPlan, 
before, loc, false, results,
+                beforeTimePartition) && noFailure;
             before = loc;
             isSequence = true;
           }
@@ -728,8 +696,7 @@ public class StorageGroupProcessor {
       }
 
       // do not forget last part
-      if (before < loc && (isSequence || 
!IoTDBDescriptor.getInstance().getConfig()
-          .isEnableDiscardOutOfOrderData())) {
+      if (before < loc) {
         noFailure = insertTabletToTsFileProcessor(insertTabletPlan, before, 
loc, isSequence,
             results, beforeTimePartition) && noFailure;
       }
@@ -757,11 +724,11 @@ public class StorageGroupProcessor {
    * inserted are in the range [start, end)
    *
    * @param insertTabletPlan insert a tablet of a device
-   * @param sequence         whether is sequence
-   * @param start            start index of rows to be inserted in 
insertTabletPlan
-   * @param end              end index of rows to be inserted in 
insertTabletPlan
-   * @param results          result array
-   * @param timePartitionId  time partition id
+   * @param sequence whether is sequence
+   * @param start start index of rows to be inserted in insertTabletPlan
+   * @param end end index of rows to be inserted in insertTabletPlan
+   * @param results result array
+   * @param timePartitionId time partition id
    * @return false if any failure occurs when inserting the tablet, true 
otherwise
    */
   private boolean insertTabletToTsFileProcessor(InsertTabletPlan 
insertTabletPlan,
@@ -804,9 +771,6 @@ public class StorageGroupProcessor {
   }
 
   private void tryToUpdateBatchInsertLastCache(InsertTabletPlan plan, Long 
latestFlushedTime) {
-    if (!IoTDBDescriptor.getInstance().getConfig().isLastCacheEnabled()) {
-      return;
-    }
     MeasurementMNode[] mNodes = plan.getMeasurementMNodes();
     for (int i = 0; i < mNodes.length; i++) {
       if (plan.getColumns()[i] == null) {
@@ -859,9 +823,6 @@ public class StorageGroupProcessor {
   }
 
   private void tryToUpdateInsertLastCache(InsertRowPlan plan, Long 
latestFlushedTime) {
-    if (!IoTDBDescriptor.getInstance().getConfig().isLastCacheEnabled()) {
-      return;
-    }
     MeasurementMNode[] mNodes = plan.getMeasurementMNodes();
     for (int i = 0; i < mNodes.length; i++) {
       if (plan.getValues()[i] == null) {
@@ -907,9 +868,9 @@ public class StorageGroupProcessor {
   /**
    * get processor from hashmap, flush oldest processor if necessary
    *
-   * @param timeRangeId            time partition range
+   * @param timeRangeId time partition range
    * @param tsFileProcessorTreeMap tsFileProcessorTreeMap
-   * @param sequence               whether is sequence or not
+   * @param sequence whether is sequence or not
    */
   private TsFileProcessor getOrCreateTsFileProcessorIntern(long timeRangeId,
       TreeMap<Long, TsFileProcessor> tsFileProcessorTreeMap,
@@ -1074,11 +1035,12 @@ public class StorageGroupProcessor {
     syncCloseAllWorkingTsFileProcessors();
     //normally, mergingModification is just need to be closed by after a merge 
task is finished.
     //we close it here just for IT test.
-    if (this.mergingModification != null) {
+    if (this.tsFileManagement.mergingModification != null) {
       try {
-        mergingModification.close();
+        this.tsFileManagement.mergingModification.close();
       } catch (IOException e) {
-        logger.error("Cannot close the mergingMod file {}", 
mergingModification.getFilePath(), e);
+        logger.error("Cannot close the mergingMod file {}",
+            this.tsFileManagement.mergingModification.getFilePath(), e);
       }
 
     }
@@ -1246,7 +1208,7 @@ public class StorageGroupProcessor {
   public QueryDataSource query(PartialPath deviceId, String measurementId, 
QueryContext context,
       QueryFileManager filePathsManager, Filter timeFilter) throws 
QueryProcessException {
     insertLock.readLock().lock();
-    mergeLock.readLock().lock();
+    tsFileManagement.mergeLock.readLock().lock();
     tsFileManagement.readLock();
     try {
       List<TsFileResource> seqResources = getFileResourceListForQuery(
@@ -1269,7 +1231,7 @@ public class StorageGroupProcessor {
       throw new QueryProcessException(e);
     } finally {
       tsFileManagement.readUnLock();
-      mergeLock.readLock().unlock();
+      tsFileManagement.mergeLock.readLock().unlock();
       insertLock.readLock().unlock();
     }
   }
@@ -1364,10 +1326,10 @@ public class StorageGroupProcessor {
    * Delete data whose timestamp <= 'timestamp' and belongs to the time series
    * deviceId.measurementId.
    *
-   * @param deviceId      the deviceId of the timeseries to be deleted.
+   * @param deviceId the deviceId of the timeseries to be deleted.
    * @param measurementId the measurementId of the timeseries to be deleted.
-   * @param startTime     the startTime of delete range.
-   * @param endTime       the endTime of delete range.
+   * @param startTime the startTime of delete range.
+   * @param endTime the endTime of delete range.
    */
   public void delete(PartialPath deviceId, String measurementId, long 
startTime, long endTime)
       throws IOException {
@@ -1375,7 +1337,7 @@ public class StorageGroupProcessor {
     // FIXME: notice that if we may remove a SGProcessor out of memory, we 
need to close all opened
     //mod files in mergingModification, sequenceFileList, and 
unsequenceFileList
     writeLock();
-    mergeLock.writeLock().lock();
+    tsFileManagement.mergeLock.writeLock().lock();
     tsFileManagement.writeLock();
 
     // record files which are updated so that we can roll back them in case of 
exception
@@ -1402,9 +1364,9 @@ public class StorageGroupProcessor {
       tryToDeleteLastCache(deviceId, measurementId, startTime, endTime);
       Deletion deletion = new Deletion(deviceId.concatNode(measurementId),
           MERGE_MOD_START_VERSION_NUM, startTime, endTime);
-      if (mergingModification != null) {
-        mergingModification.write(deletion);
-        updatedModFiles.add(mergingModification);
+      if (tsFileManagement.mergingModification != null) {
+        tsFileManagement.mergingModification.write(deletion);
+        updatedModFiles.add(tsFileManagement.mergingModification);
       }
 
       deleteDataInFiles(tsFileManagement.getTsFileList(true), deletion, 
updatedModFiles);
@@ -1418,7 +1380,7 @@ public class StorageGroupProcessor {
       throw new IOException(e);
     } finally {
       tsFileManagement.writeUnlock();
-      mergeLock.writeLock().unlock();
+      tsFileManagement.mergeLock.writeLock().unlock();
       writeUnlock();
     }
   }
@@ -1477,11 +1439,12 @@ public class StorageGroupProcessor {
 
   private void tryToDeleteLastCache(PartialPath deviceId, String 
measurementId, long startTime,
       long endTime) throws WriteProcessException {
+    MNode node = null;
     try {
       MManager manager = MManager.getInstance();
-      MNode node = manager.getDeviceNodeWithAutoCreate(deviceId);
+      node = manager.getDeviceNodeWithAutoCreateAndReadLock(deviceId);
 
-      MNode measurementNode = node.getChild(measurementId);
+      MNode measurementNode = manager.getChild(node, measurementId);
       if (measurementNode != null) {
         TimeValuePair lastPair = ((MeasurementMNode) 
measurementNode).getCachedLast();
         if (lastPair != null && startTime <= lastPair.getTimestamp()
@@ -1491,6 +1454,10 @@ public class StorageGroupProcessor {
       }
     } catch (MetadataException e) {
       throw new WriteProcessException(e);
+    } finally {
+      if (node != null) {
+        node.readUnlock();
+      }
     }
   }
 
@@ -1634,7 +1601,7 @@ public class StorageGroupProcessor {
       );
     }
     insertLock.writeLock().lock();
-    mergeLock.writeLock().lock();
+    tsFileManagement.mergeLock.writeLock().lock();
     tsFileManagement.writeLock();
     if (tsFileResource.isSeq()) {
       tsFileManagement.addAll(upgradedResources, true);
@@ -1644,7 +1611,7 @@ public class StorageGroupProcessor {
       upgradeUnseqFileList.remove(tsFileResource);
     }
     tsFileManagement.writeUnlock();
-    mergeLock.writeLock().unlock();
+    tsFileManagement.mergeLock.writeLock().unlock();
     insertLock.writeLock().unlock();
 
     // after upgrade complete, update partitionLatestFlushedTimeForEachDevice
@@ -1672,206 +1639,13 @@ public class StorageGroupProcessor {
   public void merge(boolean fullMerge) {
     writeLock();
     try {
-      if (isMerging) {
-        if (logger.isInfoEnabled()) {
-          logger.info("{} Last merge is ongoing, currently consumed time: 
{}ms", storageGroupName,
-              (System.currentTimeMillis() - mergeStartTime));
-        }
-        return;
-      }
-      logger.info("{} will close all files for starting a merge (fullmerge = 
{})", storageGroupName,
-          fullMerge);
-
-      List<TsFileResource> seqMergeList = 
tsFileManagement.getStableTsFileList(true);
-      List<TsFileResource> unSeqMergeList = 
tsFileManagement.getStableTsFileList(false);
-      if (seqMergeList.isEmpty() || unSeqMergeList.isEmpty()) {
-        logger.info("{} no files to be merged", storageGroupName);
-        return;
-      }
-
-      long budget = 
IoTDBDescriptor.getInstance().getConfig().getMergeMemoryBudget();
-      long timeLowerBound = System.currentTimeMillis() - dataTTL;
-      MergeResource mergeResource = new MergeResource(seqMergeList, 
unSeqMergeList, timeLowerBound);
-
-      IMergeFileSelector fileSelector = getMergeFileSelector(budget, 
mergeResource);
-      try {
-        List[] mergeFiles = fileSelector.select();
-        if (mergeFiles.length == 0) {
-          logger.info("{} cannot select merge candidates under the budget {}", 
storageGroupName,
-              budget);
-          return;
-        }
-        // avoid pending tasks holds the metadata and streams
-        mergeResource.clear();
-        String taskName = storageGroupName + "-" + System.currentTimeMillis();
-        // do not cache metadata until true candidates are chosen, or too much 
metadata will be
-        // cached during selection
-        mergeResource.setCacheDeviceMeta(true);
-
-        for (TsFileResource tsFileResource : mergeResource.getSeqFiles()) {
-          tsFileResource.setMerging(true);
-        }
-        for (TsFileResource tsFileResource : mergeResource.getUnseqFiles()) {
-          tsFileResource.setMerging(true);
-        }
-
-        MergeTask mergeTask = new MergeTask(mergeResource, 
storageGroupSysDir.getPath(),
-            this::mergeEndAction, taskName, fullMerge, 
fileSelector.getConcurrentMergeNum(),
-            storageGroupName);
-        mergingModification = new ModificationFile(
-            storageGroupSysDir + File.separator + 
MERGING_MODIFICATION_FILE_NAME);
-        MergeManager.getINSTANCE().submitMainTask(mergeTask);
-        if (logger.isInfoEnabled()) {
-          logger.info("{} submits a merge task {}, merging {} seqFiles, {} 
unseqFiles",
-              storageGroupName, taskName, mergeFiles[0].size(), 
mergeFiles[1].size());
-        }
-        isMerging = true;
-        mergeStartTime = System.currentTimeMillis();
-
-      } catch (MergeException | IOException e) {
-        logger.error("{} cannot select file for merge", storageGroupName, e);
-      }
+      this.tsFileManagement.merge(fullMerge, 
tsFileManagement.getTsFileList(true),
+          tsFileManagement.getTsFileList(false),dataTTL);
     } finally {
       writeUnlock();
     }
   }
 
-  private IMergeFileSelector getMergeFileSelector(long budget, MergeResource 
resource) {
-    MergeFileStrategy strategy = 
IoTDBDescriptor.getInstance().getConfig().getMergeFileStrategy();
-    switch (strategy) {
-      case MAX_FILE_NUM:
-        return new MaxFileMergeFileSelector(resource, budget);
-      case MAX_SERIES_NUM:
-        return new MaxSeriesMergeFileSelector(resource, budget);
-      default:
-        throw new UnsupportedOperationException("Unknown MergeFileStrategy " + 
strategy);
-    }
-  }
-
-  private void removeUnseqFiles(List<TsFileResource> unseqFiles) {
-    mergeLock.writeLock().lock();
-    tsFileManagement.writeLock();
-    try {
-      tsFileManagement.removeAll(unseqFiles, false);
-    } finally {
-      tsFileManagement.writeUnlock();
-      mergeLock.writeLock().unlock();
-    }
-
-    for (TsFileResource unseqFile : unseqFiles) {
-      unseqFile.writeLock();
-      try {
-        unseqFile.remove();
-      } finally {
-        unseqFile.writeUnlock();
-      }
-    }
-  }
-
-  @SuppressWarnings("squid:S1141")
-  private void updateMergeModification(TsFileResource seqFile) {
-    try {
-      // remove old modifications and write modifications generated during 
merge
-      seqFile.removeModFile();
-      if (mergingModification != null) {
-        for (Modification modification : 
mergingModification.getModifications()) {
-          seqFile.getModFile().write(modification);
-        }
-        try {
-          seqFile.getModFile().close();
-        } catch (IOException e) {
-          logger
-              .error("Cannot close the ModificationFile {}", 
seqFile.getModFile().getFilePath(), e);
-        }
-      }
-    } catch (IOException e) {
-      logger.error("{} cannot clean the ModificationFile of {} after merge", 
storageGroupName,
-          seqFile.getTsFile(), e);
-    }
-  }
-
-  private void removeMergingModification() {
-    try {
-      if (mergingModification != null) {
-        mergingModification.remove();
-        mergingModification = null;
-      }
-    } catch (IOException e) {
-      logger.error("{} cannot remove merging modification ", storageGroupName, 
e);
-    }
-  }
-
-  protected void mergeEndAction(List<TsFileResource> seqFiles, 
List<TsFileResource> unseqFiles,
-      File mergeLog) {
-    logger.info("{} a merge task is ending...", storageGroupName);
-
-    if (unseqFiles.isEmpty()) {
-      // merge runtime exception arose, just end this merge
-      isMerging = false;
-      logger.info("{} a merge task abnormally ends", storageGroupName);
-      return;
-    }
-
-    removeUnseqFiles(unseqFiles);
-
-    for (int i = 0; i < seqFiles.size(); i++) {
-      TsFileResource seqFile = seqFiles.get(i);
-      // get both seqFile lock and merge lock
-      doubleWriteLock(seqFile);
-
-      try {
-        updateMergeModification(seqFile);
-        if (i == seqFiles.size() - 1) {
-          //FIXME if there is an exception, the the modification file will be 
not closed.
-          removeMergingModification();
-          isMerging = false;
-          mergeLog.delete();
-        }
-      } finally {
-        doubleWriteUnlock(seqFile);
-      }
-    }
-    logger.info("{} a merge task ends", storageGroupName);
-  }
-
-  /**
-   * acquire the write locks of the resource , the merge lock and the hot 
compaction lock
-   */
-  private void doubleWriteLock(TsFileResource seqFile) {
-    boolean fileLockGot;
-    boolean mergeLockGot;
-    boolean hotCompactionLockGot;
-    while (true) {
-      fileLockGot = seqFile.tryWriteLock();
-      mergeLockGot = mergeLock.writeLock().tryLock();
-      hotCompactionLockGot = tsFileManagement.tryWriteLock();
-
-      if (fileLockGot && mergeLockGot && hotCompactionLockGot) {
-        break;
-      } else {
-        // did not get all of them, release the gotten one and retry
-        if (hotCompactionLockGot) {
-          tsFileManagement.writeUnlock();
-        }
-        if (mergeLockGot) {
-          mergeLock.writeLock().unlock();
-        }
-        if (fileLockGot) {
-          seqFile.writeUnlock();
-        }
-      }
-    }
-  }
-
-  /**
-   * release the write locks of the resource , the merge lock and the hot 
compaction lock
-   */
-  private void doubleWriteUnlock(TsFileResource seqFile) {
-    tsFileManagement.writeUnlock();
-    mergeLock.writeLock().unlock();
-    seqFile.writeUnlock();
-  }
-
   /**
    * Load a new tsfile to storage group processor. Tne file may have overlap 
with other files.
    * <p>
@@ -1888,7 +1662,7 @@ public class StorageGroupProcessor {
     File tsfileToBeInserted = newTsFileResource.getTsFile();
     long newFilePartitionId = newTsFileResource.getTimePartitionWithCheck();
     writeLock();
-    mergeLock.writeLock().lock();
+    tsFileManagement.mergeLock.writeLock().lock();
     tsFileManagement.writeLock();
     try {
       if (loadTsFileByType(LoadTsFileType.LOAD_SEQUENCE, tsfileToBeInserted, 
newTsFileResource,
@@ -1903,7 +1677,7 @@ public class StorageGroupProcessor {
       throw new LoadFileException(e);
     } finally {
       tsFileManagement.writeUnlock();
-      mergeLock.writeLock().unlock();
+      tsFileManagement.mergeLock.writeLock().unlock();
       writeUnlock();
     }
   }
@@ -1926,7 +1700,7 @@ public class StorageGroupProcessor {
     File tsfileToBeInserted = newTsFileResource.getTsFile();
     long newFilePartitionId = newTsFileResource.getTimePartitionWithCheck();
     writeLock();
-    mergeLock.writeLock().lock();
+    tsFileManagement.mergeLock.writeLock().lock();
     tsFileManagement.writeLock();
     try {
       List<TsFileResource> sequenceList = tsFileManagement.getTsFileList(true);
@@ -1972,7 +1746,7 @@ public class StorageGroupProcessor {
       throw new LoadFileException(e);
     } finally {
       tsFileManagement.writeUnlock();
-      mergeLock.writeLock().unlock();
+      tsFileManagement.mergeLock.writeLock().unlock();
       writeUnlock();
     }
   }
@@ -2165,9 +1939,9 @@ public class StorageGroupProcessor {
    * returns directly; otherwise, the time stamp is the mean of the timestamps 
of the two files, the
    * version number is the version number in the tsfile with a larger 
timestamp.
    *
-   * @param tsfileName  origin tsfile name
+   * @param tsfileName origin tsfile name
    * @param insertIndex the new file will be inserted between the files 
[insertIndex, insertIndex +
-   *                    1]
+   * 1]
    * @return appropriate filename
    */
   private String getFileNameForLoadingFile(String tsfileName, int insertIndex,
@@ -2231,8 +2005,8 @@ public class StorageGroupProcessor {
   /**
    * Execute the loading process by the type.
    *
-   * @param type            load type
-   * @param tsFileResource  tsfile resource to be loaded
+   * @param type load type
+   * @param tsFileResource tsfile resource to be loaded
    * @param filePartitionId the partition id of the new file
    * @return load the file successfully
    * @UsedBy sync module, load external tsfile module.
@@ -2323,7 +2097,7 @@ public class StorageGroupProcessor {
    */
   public boolean deleteTsfile(File tsfieToBeDeleted) {
     writeLock();
-    mergeLock.writeLock().lock();
+    tsFileManagement.mergeLock.writeLock().lock();
     tsFileManagement.writeLock();
     TsFileResource tsFileResourceToBeDeleted = null;
     try {
@@ -2349,7 +2123,7 @@ public class StorageGroupProcessor {
       }
     } finally {
       tsFileManagement.writeUnlock();
-      mergeLock.writeLock().unlock();
+      tsFileManagement.mergeLock.writeLock().unlock();
       writeUnlock();
     }
     if (tsFileResourceToBeDeleted == null) {
@@ -2383,7 +2157,7 @@ public class StorageGroupProcessor {
    */
   public boolean moveTsfile(File fileToBeMoved, File targetDir) {
     writeLock();
-    mergeLock.writeLock().lock();
+    tsFileManagement.mergeLock.writeLock().lock();
     tsFileManagement.writeLock();
     TsFileResource tsFileResourceToBeMoved = null;
     try {
@@ -2409,7 +2183,7 @@ public class StorageGroupProcessor {
       }
     } finally {
       tsFileManagement.writeUnlock();
-      mergeLock.writeLock().unlock();
+      tsFileManagement.mergeLock.writeLock().unlock();
       writeUnlock();
     }
     if (tsFileResourceToBeMoved == null) {
@@ -2486,13 +2260,41 @@ public class StorageGroupProcessor {
     return 
partitionFileVersions.containsAll(tsFileResource.getHistoricalVersions());
   }
 
+  private enum LoadTsFileType {
+    LOAD_SEQUENCE, LOAD_UNSEQUENCE
+  }
+
+  @FunctionalInterface
+  public interface CloseTsFileCallBack {
+
+    void call(TsFileProcessor caller) throws TsFileProcessorException, 
IOException;
+  }
+
+  @FunctionalInterface
+  public interface UpdateEndTimeCallBack {
+
+    boolean call(TsFileProcessor caller);
+  }
+
+  @FunctionalInterface
+  public interface UpgradeTsFileResourceCallBack {
+
+    void call(TsFileResource caller);
+  }
+
+  @FunctionalInterface
+  public interface CloseHotCompactionMergeCallBack {
+
+    void call();
+  }
+
   /**
    * remove all partitions that satisfy a filter.
    */
   public void removePartitions(TimePartitionFilter filter) {
     // this requires blocking all other activities
     insertLock.writeLock().lock();
-    mergeLock.writeLock().lock();
+    tsFileManagement.mergeLock.writeLock().lock();
     try {
       // abort ongoing merges
       MergeManager.getINSTANCE().abortMerge(storageGroupName);
@@ -2506,7 +2308,7 @@ public class StorageGroupProcessor {
 
     } finally {
       insertLock.writeLock().unlock();
-      mergeLock.writeLock().unlock();
+      tsFileManagement.mergeLock.writeLock().unlock();
     }
   }
 
@@ -2536,38 +2338,6 @@ public class StorageGroupProcessor {
     }
   }
 
-  public boolean isHotCompactionMergeWorking() {
-    return hotCompactionMergeWorking;
-  }
-
-  private enum LoadTsFileType {
-    LOAD_SEQUENCE, LOAD_UNSEQUENCE
-  }
-
-  @FunctionalInterface
-  public interface CloseTsFileCallBack {
-
-    void call(TsFileProcessor caller) throws TsFileProcessorException, 
IOException;
-  }
-
-  @FunctionalInterface
-  public interface UpdateEndTimeCallBack {
-
-    boolean call(TsFileProcessor caller);
-  }
-
-  @FunctionalInterface
-  public interface UpgradeTsFileResourceCallBack {
-
-    void call(TsFileResource caller);
-  }
-
-  @FunctionalInterface
-  public interface CloseHotCompactionMergeCallBack {
-
-    void call();
-  }
-
   @FunctionalInterface
   public interface TimePartitionFilter {
 
diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/tsfilemanagement/TsFileManagement.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/tsfilemanagement/TsFileManagement.java
index dcb2aaa..a59bee4 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/tsfilemanagement/TsFileManagement.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/tsfilemanagement/TsFileManagement.java
@@ -19,24 +19,56 @@
 
 package org.apache.iotdb.db.engine.tsfilemanagement;
 
+import static 
org.apache.iotdb.db.engine.storagegroup.StorageGroupProcessor.MERGING_MODIFICATION_FILE_NAME;
+
+import java.io.File;
 import java.io.IOException;
 import java.util.Iterator;
 import java.util.List;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.engine.merge.manage.MergeManager;
+import org.apache.iotdb.db.engine.merge.manage.MergeResource;
+import org.apache.iotdb.db.engine.merge.selector.IMergeFileSelector;
+import org.apache.iotdb.db.engine.merge.selector.MaxFileMergeFileSelector;
+import org.apache.iotdb.db.engine.merge.selector.MaxSeriesMergeFileSelector;
+import org.apache.iotdb.db.engine.merge.selector.MergeFileStrategy;
+import org.apache.iotdb.db.engine.merge.task.MergeTask;
+import org.apache.iotdb.db.engine.modification.Modification;
+import org.apache.iotdb.db.engine.modification.ModificationFile;
 import 
org.apache.iotdb.db.engine.storagegroup.StorageGroupProcessor.CloseHotCompactionMergeCallBack;
 import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.db.engine.tsfilemanagement.level.LevelTsFileManagement;
+import org.apache.iotdb.db.exception.MergeException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public abstract class TsFileManagement {
 
+  private static final Logger logger = 
LoggerFactory.getLogger(TsFileManagement.class);
   protected String storageGroupName;
   protected String storageGroupDir;
+
+  /**
+   * mergeLock is to be used in the merge process. Concurrent queries, 
deletions and merges may
+   * result in losing some deletion in the merged new file, so a lock is 
necessary.
+   */
+  public ReentrantReadWriteLock mergeLock = new ReentrantReadWriteLock();
   /**
    * hotCompactionMergeLock is used to wait for TsFile list change in hot 
compaction merge
    * processor.
    */
   private final ReadWriteLock hotCompactionMergeLock = new 
ReentrantReadWriteLock();
 
+  public volatile boolean isUnseqMerging = false;
+  /**
+   * This is the modification file of the result of the current merge. Because 
the merged file may
+   * be invisible at this moment, without this, deletion/update during merge 
could be lost.
+   */
+  public ModificationFile mergingModification;
+  public long mergeStartTime;
+
   public TsFileManagement(String storageGroupName, String storageGroupDir) {
     this.storageGroupName = storageGroupName;
     this.storageGroupDir = storageGroupDir;
@@ -146,4 +178,202 @@ public abstract class TsFileManagement {
       closeHotCompactionMergeCallBack.call();
     }
   }
+
+  public void merge(boolean fullMerge, List<TsFileResource> seqMergeList,
+      List<TsFileResource> unSeqMergeList, long dataTTL) {
+    if (isUnseqMerging) {
+      if (logger.isInfoEnabled()) {
+        logger.info("{} Last merge is ongoing, currently consumed time: {}ms", 
storageGroupName,
+            (System.currentTimeMillis() - mergeStartTime));
+      }
+      return;
+    }
+    logger.info("{} will close all files for starting a merge (fullmerge = 
{})", storageGroupName,
+        fullMerge);
+
+    if (seqMergeList.isEmpty() || unSeqMergeList.isEmpty()) {
+      logger.info("{} no files to be merged", storageGroupName);
+      return;
+    }
+
+    long budget = 
IoTDBDescriptor.getInstance().getConfig().getMergeMemoryBudget();
+    long timeLowerBound = System.currentTimeMillis() - dataTTL;
+    MergeResource mergeResource = new MergeResource(seqMergeList, 
unSeqMergeList, timeLowerBound);
+
+    IMergeFileSelector fileSelector = getMergeFileSelector(budget, 
mergeResource);
+    try {
+      List[] mergeFiles = fileSelector.select();
+      if (mergeFiles.length == 0) {
+        logger.info("{} cannot select merge candidates under the budget {}", 
storageGroupName,
+            budget);
+        return;
+      }
+      // avoid pending tasks holds the metadata and streams
+      mergeResource.clear();
+      String taskName = storageGroupName + "-" + System.currentTimeMillis();
+      // do not cache metadata until true candidates are chosen, or too much 
metadata will be
+      // cached during selection
+      mergeResource.setCacheDeviceMeta(true);
+
+      for (TsFileResource tsFileResource : mergeResource.getSeqFiles()) {
+        tsFileResource.setMerging(true);
+      }
+      for (TsFileResource tsFileResource : mergeResource.getUnseqFiles()) {
+        tsFileResource.setMerging(true);
+      }
+
+      MergeTask mergeTask = new MergeTask(mergeResource, storageGroupDir,
+          this::mergeEndAction, taskName, fullMerge, 
fileSelector.getConcurrentMergeNum(),
+          storageGroupName);
+      mergingModification = new ModificationFile(
+          storageGroupDir + File.separator + MERGING_MODIFICATION_FILE_NAME);
+      MergeManager.getINSTANCE().submitMainTask(mergeTask);
+      if (logger.isInfoEnabled()) {
+        logger.info("{} submits a merge task {}, merging {} seqFiles, {} 
unseqFiles",
+            storageGroupName, taskName, mergeFiles[0].size(), 
mergeFiles[1].size());
+      }
+      isUnseqMerging = true;
+      mergeStartTime = System.currentTimeMillis();
+
+    } catch (MergeException | IOException e) {
+      logger.error("{} cannot select file for merge", storageGroupName, e);
+    }
+  }
+
+  private IMergeFileSelector getMergeFileSelector(long budget, MergeResource 
resource) {
+    MergeFileStrategy strategy = 
IoTDBDescriptor.getInstance().getConfig().getMergeFileStrategy();
+    switch (strategy) {
+      case MAX_FILE_NUM:
+        return new MaxFileMergeFileSelector(resource, budget);
+      case MAX_SERIES_NUM:
+        return new MaxSeriesMergeFileSelector(resource, budget);
+      default:
+        throw new UnsupportedOperationException("Unknown MergeFileStrategy " + 
strategy);
+    }
+  }
+
+  /**
+   * acquire the write locks of the resource , the merge lock and the hot 
compaction lock
+   */
+  private void doubleWriteLock(TsFileResource seqFile) {
+    boolean fileLockGot;
+    boolean mergeLockGot;
+    boolean hotCompactionLockGot;
+    while (true) {
+      fileLockGot = seqFile.tryWriteLock();
+      mergeLockGot = mergeLock.writeLock().tryLock();
+      hotCompactionLockGot = tryWriteLock();
+
+      if (fileLockGot && mergeLockGot && hotCompactionLockGot) {
+        break;
+      } else {
+        // did not get all of them, release the gotten one and retry
+        if (hotCompactionLockGot) {
+          writeUnlock();
+        }
+        if (mergeLockGot) {
+          mergeLock.writeLock().unlock();
+        }
+        if (fileLockGot) {
+          seqFile.writeUnlock();
+        }
+      }
+    }
+  }
+
+  /**
+   * release the write locks of the resource , the merge lock and the hot 
compaction lock
+   */
+  private void doubleWriteUnlock(TsFileResource seqFile) {
+    writeUnlock();
+    mergeLock.writeLock().unlock();
+    seqFile.writeUnlock();
+  }
+
+  private void removeUnseqFiles(List<TsFileResource> unseqFiles) {
+    mergeLock.writeLock().lock();
+    writeLock();
+    try {
+      removeAll(unseqFiles, false);
+    } finally {
+      writeUnlock();
+      mergeLock.writeLock().unlock();
+    }
+
+    for (TsFileResource unseqFile : unseqFiles) {
+      unseqFile.writeLock();
+      try {
+        unseqFile.remove();
+      } finally {
+        unseqFile.writeUnlock();
+      }
+    }
+  }
+
+  @SuppressWarnings("squid:S1141")
+  private void updateMergeModification(TsFileResource seqFile) {
+    try {
+      // remove old modifications and write modifications generated during 
merge
+      seqFile.removeModFile();
+      if (mergingModification != null) {
+        for (Modification modification : 
mergingModification.getModifications()) {
+          seqFile.getModFile().write(modification);
+        }
+        try {
+          seqFile.getModFile().close();
+        } catch (IOException e) {
+          logger
+              .error("Cannot close the ModificationFile {}", 
seqFile.getModFile().getFilePath(), e);
+        }
+      }
+    } catch (IOException e) {
+      logger.error("{} cannot clean the ModificationFile of {} after merge", 
storageGroupName,
+          seqFile.getTsFile(), e);
+    }
+  }
+
+  private void removeMergingModification() {
+    try {
+      if (mergingModification != null) {
+        mergingModification.remove();
+        mergingModification = null;
+      }
+    } catch (IOException e) {
+      logger.error("{} cannot remove merging modification ", storageGroupName, 
e);
+    }
+  }
+
+  public void mergeEndAction(List<TsFileResource> seqFiles, 
List<TsFileResource> unseqFiles,
+      File mergeLog) {
+    logger.info("{} a merge task is ending...", storageGroupName);
+
+    if (unseqFiles.isEmpty()) {
+      // merge runtime exception arose, just end this merge
+      isUnseqMerging = false;
+      logger.info("{} a merge task abnormally ends", storageGroupName);
+      return;
+    }
+
+    removeUnseqFiles(unseqFiles);
+
+    for (int i = 0; i < seqFiles.size(); i++) {
+      TsFileResource seqFile = seqFiles.get(i);
+      // get both seqFile lock and merge lock
+      doubleWriteLock(seqFile);
+
+      try {
+        updateMergeModification(seqFile);
+        if (i == seqFiles.size() - 1) {
+          //FIXME if there is an exception, the the modification file will be 
not closed.
+          removeMergingModification();
+          isUnseqMerging = false;
+          mergeLog.delete();
+        }
+      } finally {
+        doubleWriteUnlock(seqFile);
+      }
+    }
+    logger.info("{} a merge task ends", storageGroupName);
+  }
+
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/tsfilemanagement/level/LevelTsFileManagement.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/tsfilemanagement/level/LevelTsFileManagement.java
index 2b852d2..8c5f482 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/tsfilemanagement/level/LevelTsFileManagement.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/tsfilemanagement/level/LevelTsFileManagement.java
@@ -35,7 +35,6 @@ import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
-import java.util.Map.Entry;
 import java.util.Set;
 import java.util.TreeSet;
 import java.util.concurrent.ConcurrentSkipListMap;
@@ -44,16 +43,23 @@ import java.util.concurrent.CopyOnWriteArrayList;
 import com.clearspring.analytics.stream.cardinality.HyperLogLog;
 import com.clearspring.analytics.stream.cardinality.ICardinality;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
-import org.apache.iotdb.db.conf.adapter.ActiveTimeSeriesCounter;
 import org.apache.iotdb.db.engine.cache.ChunkMetadataCache;
-import org.apache.iotdb.db.engine.cache.FileChunkPointSizeCache;
+import org.apache.iotdb.db.engine.merge.manage.MergeManager;
+import org.apache.iotdb.db.engine.merge.manage.MergeResource;
+import org.apache.iotdb.db.engine.merge.selector.IMergeFileSelector;
+import org.apache.iotdb.db.engine.merge.task.MergeTask;
+import org.apache.iotdb.db.engine.modification.ModificationFile;
 import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
 import org.apache.iotdb.db.engine.tsfilemanagement.TsFileManagement;
 import 
org.apache.iotdb.db.engine.tsfilemanagement.utils.HotCompactionLogAnalyzer;
 import org.apache.iotdb.db.engine.tsfilemanagement.utils.HotCompactionLogger;
 import org.apache.iotdb.db.engine.tsfilemanagement.utils.HotCompactionUtils;
+import org.apache.iotdb.db.exception.MergeException;
 import org.apache.iotdb.db.query.control.FileReaderManager;
+import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
 import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
+import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
+import org.apache.iotdb.tsfile.read.common.Path;
 import org.apache.iotdb.tsfile.utils.Pair;
 import org.apache.iotdb.tsfile.write.writer.RestorableTsFileIOWriter;
 import org.slf4j.Logger;
@@ -66,21 +72,26 @@ public class LevelTsFileManagement extends TsFileManagement 
{
 
   private static final Logger logger = 
LoggerFactory.getLogger(LevelTsFileManagement.class);
   private final int maxLevelNum = 
IoTDBDescriptor.getInstance().getConfig().getMaxLevelNum();
+  private final int maxFileNumInEachLevel = 
IoTDBDescriptor.getInstance().getConfig()
+      .getMaxFileNumInEachLevel();
+  private final int maxUnseqLevelNum = 
IoTDBDescriptor.getInstance().getConfig()
+      .getMaxUnseqLevelNum();
+  private final int maxUnseqFileNumInEachLevel = 
IoTDBDescriptor.getInstance().getConfig()
+      .getMaxFileNumInEachLevel();
   private final int maxChunkPointNum = 
IoTDBDescriptor.getInstance().getConfig()
       .getMergeChunkPointNumberThreshold();
+  private final boolean isForceFullMerge = 
IoTDBDescriptor.getInstance().getConfig()
+      .isForceFullMerge();
   // First map is partition list; Second list is level list; Third list is 
file list in level;
   private final Map<Long, List<TreeSet<TsFileResource>>> 
sequenceTsFileResources = new ConcurrentSkipListMap<>();
   private final Map<Long, List<List<TsFileResource>>> 
unSequenceTsFileResources = new ConcurrentSkipListMap<>();
   private final List<List<TsFileResource>> forkedSequenceTsFileResources = new 
ArrayList<>();
   private final List<List<TsFileResource>> forkedUnSequenceTsFileResources = 
new ArrayList<>();
 
-  // Deciding whether or not to merge;
-  //private boolean forkedSeqListMergeFlag = false;
-  //private boolean forkedUnSeqListMergeFlag = false;
   private double forkedSeqListPointNum = 0;
-  private double forkedSeqListDeviceSize = 0;
+  private double forkedSeqListMeasurementSize = 0;
   private double forkedUnSeqListPointNum = 0;
-  private double forkedUnSeqListDeviceSize = 0;
+  private double forkedUnSeqListMeasurementSize = 0;
 
   public LevelTsFileManagement(String storageGroupName, String 
storageGroupDir) {
     super(storageGroupName, storageGroupDir);
@@ -94,6 +105,8 @@ public class LevelTsFileManagement extends TsFileManagement {
     }
     for (int i = 0; i < maxLevelNum; i++) {
       
sequenceTsFileResources.get(timePartitionId).get(i).removeAll(mergeTsFiles);
+    }
+    for (int i = 0; i < maxUnseqLevelNum; i++) {
       
unSequenceTsFileResources.get(timePartitionId).get(i).removeAll(mergeTsFiles);
     }
   }
@@ -116,7 +129,8 @@ public class LevelTsFileManagement extends TsFileManagement 
{
       List<List<TsFileResource>> currMergeFiles,
       HotCompactionLogger hotCompactionLogger, boolean sequence) throws 
IOException {
     TsFileResource sourceFile = currMergeFiles.get(0).get(0);
-    File newTargetFile = createNewTsFileName(sourceFile.getTsFile(), 
maxLevelNum - 1);
+    File newTargetFile = createNewTsFileName(sourceFile.getTsFile(),
+        sequence ? (maxLevelNum - 1) : (maxUnseqLevelNum - 1));
     TsFileResource targetResource = new TsFileResource(newTargetFile);
     List<TsFileResource> mergeFiles = new ArrayList<>();
     for (int i = currMergeFiles.size() - 1; i >= 0; i--) {
@@ -128,8 +142,14 @@ public class LevelTsFileManagement extends 
TsFileManagement {
     hotCompactionLogger.logSequence(sequence);
     hotCompactionLogger.logFile(TARGET_NAME, newTargetFile);
     writeLock();
-    for (int i = 0; i < maxLevelNum - 1; i++) {
-      deleteLevelFiles(timePartitionId, currMergeFiles.get(i));
+    if (sequence) {
+      for (int i = 0; i < maxLevelNum - 1; i++) {
+        deleteLevelFiles(timePartitionId, currMergeFiles.get(i));
+      }
+    } else {
+      for (int i = 0; i < maxUnseqLevelNum - 1; i++) {
+        deleteLevelFiles(timePartitionId, currMergeFiles.get(i));
+      }
     }
     writeUnlock();
     hotCompactionLogger.logMergeFinish();
@@ -144,7 +164,7 @@ public class LevelTsFileManagement extends TsFileManagement 
{
       }
     } else {
       for (List<List<TsFileResource>> unSequenceTsFileList : 
unSequenceTsFileResources.values()) {
-        result.addAll(unSequenceTsFileList.get(maxLevelNum - 1));
+        result.addAll(unSequenceTsFileList.get(maxUnseqLevelNum - 1));
       }
     }
     return result;
@@ -212,25 +232,25 @@ public class LevelTsFileManagement extends 
TsFileManagement {
   public void add(TsFileResource tsFileResource, boolean sequence) {
     long timePartitionId = tsFileResource.getTimePartition();
     int level = getMergeLevel(tsFileResource.getTsFile());
-    if (level <= maxLevelNum - 1) {
-      if (sequence) {
+    if (sequence) {
+      if (level <= maxLevelNum - 1) {
         sequenceTsFileResources
             .computeIfAbsent(timePartitionId, 
this::newSequenceTsFileResources).get(level)
             .add(tsFileResource);
       } else {
-        unSequenceTsFileResources
-            .computeIfAbsent(timePartitionId, 
this::newUnSequenceTsFileResources).get(level)
+        sequenceTsFileResources
+            .computeIfAbsent(timePartitionId, 
this::newSequenceTsFileResources).get(maxLevelNum - 1)
             .add(tsFileResource);
       }
     } else {
-      if (sequence) {
-        sequenceTsFileResources
-            .computeIfAbsent(timePartitionId, 
this::newSequenceTsFileResources).get(maxLevelNum - 1)
+      if (level <= maxUnseqLevelNum - 1) {
+        unSequenceTsFileResources
+            .computeIfAbsent(timePartitionId, 
this::newUnSequenceTsFileResources).get(level)
             .add(tsFileResource);
       } else {
         unSequenceTsFileResources
             .computeIfAbsent(timePartitionId, 
this::newUnSequenceTsFileResources)
-            .get(maxLevelNum - 1).add(tsFileResource);
+            .get(maxUnseqLevelNum - 1).add(tsFileResource);
       }
     }
   }
@@ -306,7 +326,7 @@ public class LevelTsFileManagement extends TsFileManagement 
{
     } else {
       for (List<List<TsFileResource>> partitionUnSequenceTsFileResource : 
unSequenceTsFileResources
           .values()) {
-        for (int i = maxLevelNum - 1; i >= 0; i--) {
+        for (int i = maxUnseqLevelNum - 1; i >= 0; i--) {
           result += partitionUnSequenceTsFileResource.get(i).size();
         }
       }
@@ -413,41 +433,55 @@ public class LevelTsFileManagement extends 
TsFileManagement {
   public void forkCurrentFileList(long timePartition) {
     Pair<Double, Double> seqResult = forkTsFileList(
         forkedSequenceTsFileResources,
-        sequenceTsFileResources.computeIfAbsent(timePartition, 
this::newSequenceTsFileResources));
+        sequenceTsFileResources.computeIfAbsent(timePartition, 
this::newSequenceTsFileResources),
+        maxLevelNum);
     forkedSeqListPointNum = seqResult.left;
-    forkedSeqListDeviceSize = seqResult.right;
+    forkedSeqListMeasurementSize = seqResult.right;
     Pair<Double, Double> unSeqResult = forkTsFileList(
         forkedUnSequenceTsFileResources,
-        unSequenceTsFileResources.computeIfAbsent(timePartition, 
this::newUnSequenceTsFileResources));
+        unSequenceTsFileResources
+            .computeIfAbsent(timePartition, 
this::newUnSequenceTsFileResources), maxUnseqLevelNum);
     forkedUnSeqListPointNum = unSeqResult.left;
-    forkedUnSeqListDeviceSize = unSeqResult.right;
+    forkedUnSeqListMeasurementSize = unSeqResult.right;
   }
 
   private Pair<Double, Double> forkTsFileList(
       List<List<TsFileResource>> forkedTsFileResources,
-      List rawTsFileResources) {
+      List rawTsFileResources, int currMaxLevel) {
     forkedTsFileResources.clear();
     // just fork part of the TsFile list, controlled by max_merge_chunk_point
     long pointNum = 0;
     // all flush to target file
-//  Set<String> deviceSet = new HashSet<>();
-    ICardinality deviceSet = new HyperLogLog(13);
-    for (int i = 0; i < maxLevelNum - 1; i++) {
+    ICardinality measurementSet = new HyperLogLog(13);
+    for (int i = 0; i < currMaxLevel - 1; i++) {
       List<TsFileResource> forkedLevelTsFileResources = new ArrayList<>();
       Collection<TsFileResource> levelRawTsFileResources = 
(Collection<TsFileResource>) rawTsFileResources
           .get(i);
       synchronized (levelRawTsFileResources) {
         for (TsFileResource tsFileResource : levelRawTsFileResources) {
           if (tsFileResource.isClosed()) {
-            Map<String, Long> chunkPointMap = 
FileChunkPointSizeCache.getInstance()
-                .get(tsFileResource.getTsFile());
-            for (Entry<String, Long> deviceChunkPoint : 
chunkPointMap.entrySet()) {
-              deviceSet.offer(deviceChunkPoint.getKey());
-              pointNum += deviceChunkPoint.getValue();
+            String path = tsFileResource.getTsFile().getAbsolutePath();
+            try {
+              if (tsFileResource.getTsFile().exists()) {
+                TsFileSequenceReader reader = new TsFileSequenceReader(path);
+                List<Path> pathList = reader.getAllPaths();
+                for (Path sensorPath : pathList) {
+                  measurementSet.offer(sensorPath.getFullPath());
+                  List<ChunkMetadata> chunkMetadataList = 
reader.getChunkMetadataList(sensorPath);
+                  for (ChunkMetadata chunkMetadata : chunkMetadataList) {
+                    pointNum += chunkMetadata.getNumOfPoints();
+                  }
+                }
+              } else {
+                logger.info("{} tsfile does not exist", path);
+              }
+            } catch (IOException e) {
+              logger.error(
+                  "{} tsfile reader creates error", path, e);
             }
           }
-          if (deviceSet.cardinality() > 0
-              && pointNum / deviceSet.cardinality() >= maxChunkPointNum) {
+          if (measurementSet.cardinality() > 0
+              && pointNum / measurementSet.cardinality() >= maxChunkPointNum) {
             forkedLevelTsFileResources.add(tsFileResource);
             break;
           }
@@ -455,112 +489,94 @@ public class LevelTsFileManagement extends 
TsFileManagement {
         }
       }
 
-      if (deviceSet.cardinality() > 0
-          && pointNum / deviceSet.cardinality() >= maxChunkPointNum) {
+      if (measurementSet.cardinality() > 0
+          && pointNum / measurementSet.cardinality() >= maxChunkPointNum) {
         forkedTsFileResources.add(forkedLevelTsFileResources);
         break;
       }
       forkedTsFileResources.add(forkedLevelTsFileResources);
-      //System.out.println(forkedLevelTsFileResources.size());
-      //System.out.println(forkedTsFileResources.get(i).size());
     }
 
     // fill in empty file
-    while (forkedTsFileResources.size() < maxLevelNum) {
+    while (forkedTsFileResources.size() < currMaxLevel) {
       List<TsFileResource> emptyForkedLevelTsFileResources = new ArrayList<>();
       forkedTsFileResources.add(emptyForkedLevelTsFileResources);
     }
 
-    //return forkedTsFileResources.size() > 1;
-    return new Pair<>((double)pointNum, (double)deviceSet.cardinality());
+    return new Pair<>((double) pointNum, (double) 
measurementSet.cardinality());
   }
 
-//  private Pair<Long, Set<String>> forkTsFileList(
-//      List<List<TsFileResource>> forkedTsFileResources,
-//      List rawTsFileResources) {
-//    forkedTsFileResources.clear();
-//    // just fork part of the TsFile list, controlled by max_merge_chunk_point
-//    long pointNum = 0;
-//    // all flush to target file
-//    Set<String> deviceSet = new HashSet<>();
-//    for (int i = 0; i < maxLevelNum - 1; i++) {
-//      List<TsFileResource> forkedLevelTsFileResources = new ArrayList<>();
-//      Collection<TsFileResource> levelRawTsFileResources = 
(Collection<TsFileResource>) rawTsFileResources
-//          .get(i);
-//      synchronized (levelRawTsFileResources) {
-//        for (TsFileResource tsFileResource : levelRawTsFileResources) {
-//          if (tsFileResource.isClosed()) {
-//            forkedLevelTsFileResources.add(tsFileResource);
-//          }
-//        }
-//      }
-//      forkedTsFileResources.add(forkedLevelTsFileResources);
-//    }
-//    return new Pair<>(pointNum, deviceSet);
-//  }
-
   @Override
   protected void merge(long timePartition) {
-    merge(forkedSequenceTsFileResources, true, timePartition);
-    merge(forkedUnSequenceTsFileResources, false, timePartition);
+    merge(forkedSequenceTsFileResources, true, timePartition, maxLevelNum, 
maxFileNumInEachLevel);
+    if (maxUnseqLevelNum <= 1) {
+      merge(isForceFullMerge, getTsFileList(true), 
forkedUnSequenceTsFileResources.get(0),
+          Long.MAX_VALUE);
+    } else {
+      merge(forkedUnSequenceTsFileResources, false, timePartition, 
maxUnseqLevelNum,
+          maxUnseqFileNumInEachLevel);
+    }
   }
 
   @SuppressWarnings("squid:S3776")
   private void merge(List<List<TsFileResource>> mergeResources, boolean 
sequence,
-      long timePartition) {
+      long timePartition, int currMaxLevel, int currMaxFileNumInEachLevel) {
     long startTimeMillis = System.currentTimeMillis();
     try {
       logger.info("{} start to filter hot compaction condition", 
storageGroupName);
       double pointNum = sequence ? forkedSeqListPointNum : 
forkedUnSeqListPointNum;
-      double deviceSize =
-          sequence ? forkedSeqListDeviceSize : forkedSeqListDeviceSize;
-      //boolean mergeFlag = sequence ? forkedSeqListMergeFlag : 
forkedUnSeqListMergeFlag;
-
+      double measurementSize =
+          sequence ? forkedSeqListMeasurementSize : 
forkedUnSeqListMeasurementSize;
       logger
-          .info("{} current sg subLevel point num: {}, approximate device num: 
{}", storageGroupName, pointNum,
-                  deviceSize);
+          .info("{} current sg subLevel point num: {}, approximate measurement 
num: {}",
+              storageGroupName, pointNum,
+              measurementSize);
       HotCompactionLogger hotCompactionLogger = new 
HotCompactionLogger(storageGroupDir,
           storageGroupName);
-      if (deviceSize > 0 && pointNum / deviceSize >= maxChunkPointNum) {
+      if (measurementSize > 0 && pointNum / measurementSize >= 
maxChunkPointNum) {
         // merge all tsfile to last level
         logger.info("{} merge {} level tsfiles to next level", 
storageGroupName,
             mergeResources.size());
         flushAllFilesToLastLevel(timePartition, mergeResources, 
hotCompactionLogger, sequence);
       } else {
-      for (int i = 0; i < maxLevelNum - 1; i++) {
-        if (maxFileNumInEachLevel <= mergeResources.get(i).size()) {
-          for (TsFileResource mergeResource : mergeResources.get(i)) {
-            hotCompactionLogger.logFile(SOURCE_NAME, 
mergeResource.getTsFile());
-          }
-          File newLevelFile = 
createNewTsFileName(mergeResources.get(i).get(0).getTsFile(),
-              i + 1);
-          hotCompactionLogger.logSequence(sequence);
-          hotCompactionLogger.logFile(TARGET_NAME, newLevelFile);
-          logger.info("{} [Hot Compaction] merge level-{}'s {} tsfiles to next 
level vm",
-              storageGroupName, i, mergeResources.get(i).size());
-
-          TsFileResource newResource = new TsFileResource(newLevelFile);
-          HotCompactionUtils
-              .merge(newResource, mergeResources.get(i), storageGroupName, 
hotCompactionLogger,
-                  new HashSet<>(), sequence);
-          writeLock();
-          try {
-            deleteLevelFiles(timePartition, mergeResources.get(i));
-            hotCompactionLogger.logMergeFinish();
-            if (sequence) {
-              sequenceTsFileResources.get(timePartition).get(i + 
1).add(newResource);
+        for (int i = 0; i < currMaxLevel - 1; i++) {
+          if (currMaxFileNumInEachLevel <= mergeResources.get(i).size()) {
+            if (!sequence && i == currMaxLevel - 2) {
+              merge(isForceFullMerge, getTsFileList(true), 
mergeResources.get(i), Long.MAX_VALUE);
             } else {
-              unSequenceTsFileResources.get(timePartition).get(i + 
1).add(newResource);
-            }
-            if (mergeResources.size() > i + 1) {
-              mergeResources.get(i + 1).add(newResource);
+              for (TsFileResource mergeResource : mergeResources.get(i)) {
+                hotCompactionLogger.logFile(SOURCE_NAME, 
mergeResource.getTsFile());
+              }
+              File newLevelFile = 
createNewTsFileName(mergeResources.get(i).get(0).getTsFile(),
+                  i + 1);
+              hotCompactionLogger.logSequence(sequence);
+              hotCompactionLogger.logFile(TARGET_NAME, newLevelFile);
+              logger.info("{} [Hot Compaction] merge level-{}'s {} tsfiles to 
next level vm",
+                  storageGroupName, i, mergeResources.get(i).size());
+
+              TsFileResource newResource = new TsFileResource(newLevelFile);
+              HotCompactionUtils
+                  .merge(newResource, mergeResources.get(i), storageGroupName, 
hotCompactionLogger,
+                      new HashSet<>(), sequence);
+              writeLock();
+              try {
+                deleteLevelFiles(timePartition, mergeResources.get(i));
+                hotCompactionLogger.logMergeFinish();
+                if (sequence) {
+                  sequenceTsFileResources.get(timePartition).get(i + 
1).add(newResource);
+                } else {
+                  unSequenceTsFileResources.get(timePartition).get(i + 
1).add(newResource);
+                }
+                if (mergeResources.size() > i + 1) {
+                  mergeResources.get(i + 1).add(newResource);
+                }
+              } finally {
+                writeUnlock();
+              }
             }
-          } finally {
-            writeUnlock();
           }
         }
       }
-      }
       hotCompactionLogger.close();
       File logFile = FSFactoryProducer.getFSFactory()
           .getFile(storageGroupDir, storageGroupName + 
HOT_COMPACTION_LOG_NAME);
@@ -610,7 +626,7 @@ public class LevelTsFileManagement extends TsFileManagement 
{
 
   private List<List<TsFileResource>> newUnSequenceTsFileResources(Long k) {
     List<List<TsFileResource>> newUnSequenceTsFileResources = new 
CopyOnWriteArrayList<>();
-    for (int i = 0; i < maxLevelNum; i++) {
+    for (int i = 0; i < maxUnseqLevelNum; i++) {
       newUnSequenceTsFileResources.add(new CopyOnWriteArrayList<>());
     }
     return newUnSequenceTsFileResources;
diff --git 
a/server/src/test/java/org/apache/iotdb/db/engine/merge/MergeManagerTest.java 
b/server/src/test/java/org/apache/iotdb/db/engine/merge/MergeManagerTest.java
index c77bad7..d03abc0 100644
--- 
a/server/src/test/java/org/apache/iotdb/db/engine/merge/MergeManagerTest.java
+++ 
b/server/src/test/java/org/apache/iotdb/db/engine/merge/MergeManagerTest.java
@@ -36,9 +36,9 @@ public class MergeManagerTest extends MergeTest {
     RateLimiter compactionRateLimiter = 
MergeManager.getINSTANCE().getMergeRateLimiter();
     long startTime = System.currentTimeMillis();
     MergeManager.mergeRateLimiterAcquire(compactionRateLimiter, 160 * 1024 * 
1024L);
-    assertTrue((System.currentTimeMillis() - startTime) < 1000);
+    assertTrue((System.currentTimeMillis() - startTime) <= 1000);
     MergeManager.mergeRateLimiterAcquire(compactionRateLimiter, 16 * 1024 * 
1024L);
-    assertTrue((System.currentTimeMillis() - startTime) > 1000);
+    assertTrue((System.currentTimeMillis() - startTime) >= 9000);
   }
 
 
diff --git 
a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessorTest.java
 
b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessorTest.java
index cfa7c64..d33c92c 100644
--- 
a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessorTest.java
+++ 
b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessorTest.java
@@ -26,7 +26,6 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicLong;
-import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.conf.adapter.ActiveTimeSeriesCounter;
 import org.apache.iotdb.db.constant.TestConstant;
@@ -41,6 +40,7 @@ import org.apache.iotdb.db.exception.WriteProcessException;
 import org.apache.iotdb.db.exception.metadata.IllegalPathException;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.metadata.PartialPath;
+import org.apache.iotdb.db.metadata.mnode.MNode;
 import org.apache.iotdb.db.metadata.mnode.MeasurementMNode;
 import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan;
 import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan;
@@ -66,7 +66,6 @@ public class StorageGroupProcessorTest {
   private String measurementId = "s0";
   private StorageGroupProcessor processor;
   private QueryContext context = EnvironmentUtils.TEST_QUERY_CONTEXT;
-  private AtomicLong mergeLock;
 
   @Before
   public void setUp() throws Exception {
@@ -158,8 +157,9 @@ public class StorageGroupProcessorTest {
       e.printStackTrace();
     }
     processor.syncCloseAllWorkingTsFileProcessors();
-    QueryDataSource queryDataSource = processor.query(new 
PartialPath(deviceId), measurementId, context,
-        null, null);
+    QueryDataSource queryDataSource = processor
+        .query(new PartialPath(deviceId), measurementId, context,
+            null, null);
     Assert.assertEquals(10, queryDataSource.getSeqResources().size());
     for (TsFileResource resource : queryDataSource.getSeqResources()) {
       Assert.assertTrue(resource.isClosed());
@@ -177,15 +177,14 @@ public class StorageGroupProcessorTest {
     dataTypes.add(TSDataType.INT32.ordinal());
     dataTypes.add(TSDataType.INT64.ordinal());
 
-
     MeasurementMNode[] measurementMNodes = new MeasurementMNode[2];
     measurementMNodes[0] = new MeasurementMNode(null, "s0",
         new MeasurementSchema("s0", TSDataType.INT32, TSEncoding.PLAIN), null);
     measurementMNodes[1] = new MeasurementMNode(null, "s1",
         new MeasurementSchema("s1", TSDataType.INT64, TSEncoding.PLAIN), null);
 
-
-    InsertTabletPlan insertTabletPlan1 = new InsertTabletPlan(new 
PartialPath("root.vehicle.d0"), measurements,
+    InsertTabletPlan insertTabletPlan1 = new InsertTabletPlan(new 
PartialPath("root.vehicle.d0"),
+        measurements,
         dataTypes);
     insertTabletPlan1.setMeasurementMNodes(measurementMNodes);
 
@@ -206,7 +205,8 @@ public class StorageGroupProcessorTest {
     processor.insertTablet(insertTabletPlan1);
     processor.asyncCloseAllWorkingTsFileProcessors();
 
-    InsertTabletPlan insertTabletPlan2 = new InsertTabletPlan(new 
PartialPath("root.vehicle.d0"), measurements,
+    InsertTabletPlan insertTabletPlan2 = new InsertTabletPlan(new 
PartialPath("root.vehicle.d0"),
+        measurements,
         dataTypes);
     insertTabletPlan2.setMeasurementMNodes(measurementMNodes);
 
@@ -223,8 +223,9 @@ public class StorageGroupProcessorTest {
     processor.asyncCloseAllWorkingTsFileProcessors();
     processor.syncCloseAllWorkingTsFileProcessors();
 
-    QueryDataSource queryDataSource = processor.query(new 
PartialPath(deviceId), measurementId, context,
-        null, null);
+    QueryDataSource queryDataSource = processor
+        .query(new PartialPath(deviceId), measurementId, context,
+            null, null);
 
     Assert.assertEquals(2, queryDataSource.getSeqResources().size());
     Assert.assertEquals(1, queryDataSource.getUnseqResources().size());
@@ -255,8 +256,9 @@ public class StorageGroupProcessorTest {
 
     processor.syncCloseAllWorkingTsFileProcessors();
 
-    QueryDataSource queryDataSource = processor.query(new 
PartialPath(deviceId), measurementId, context,
-        null, null);
+    QueryDataSource queryDataSource = processor
+        .query(new PartialPath(deviceId), measurementId, context,
+            null, null);
     Assert.assertEquals(10, queryDataSource.getSeqResources().size());
     Assert.assertEquals(10, queryDataSource.getUnseqResources().size());
     for (TsFileResource resource : queryDataSource.getSeqResources()) {
@@ -268,291 +270,8 @@ public class StorageGroupProcessorTest {
   }
 
   @Test
-  public void testEnableDiscardOutOfOrderDataForInsertRowPlan()
-      throws WriteProcessException, QueryProcessException, 
IllegalPathException, IOException {
-    IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
-    boolean defaultValue = config.isEnableDiscardOutOfOrderData();
-    config.setEnableDiscardOutOfOrderData(true);
-
-    for (int j = 21; j <= 30; j++) {
-      TSRecord record = new TSRecord(j, deviceId);
-      record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId, 
String.valueOf(j)));
-      insertToStorageGroupProcessor(record);
-      processor.asyncCloseAllWorkingTsFileProcessors();
-    }
-    processor.syncCloseAllWorkingTsFileProcessors();
-
-    for (int j = 10; j >= 1; j--) {
-      TSRecord record = new TSRecord(j, deviceId);
-      record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId, 
String.valueOf(j)));
-      insertToStorageGroupProcessor(record);
-      processor.asyncCloseAllWorkingTsFileProcessors();
-    }
-
-    processor.syncCloseAllWorkingTsFileProcessors();
-
-    for (TsFileProcessor tsfileProcessor : 
processor.getWorkUnsequenceTsFileProcessor()) {
-      tsfileProcessor.syncFlush();
-    }
-
-    QueryDataSource queryDataSource = processor.query(new 
PartialPath(deviceId), measurementId, context,
-        null, null);
-    Assert.assertEquals(10, queryDataSource.getSeqResources().size());
-    Assert.assertEquals(0, queryDataSource.getUnseqResources().size());
-    for (TsFileResource resource : queryDataSource.getSeqResources()) {
-      Assert.assertTrue(resource.isClosed());
-    }
-    for (TsFileResource resource : queryDataSource.getUnseqResources()) {
-      Assert.assertTrue(resource.isClosed());
-    }
-
-    config.setEnableDiscardOutOfOrderData(defaultValue);
-  }
-
-  @Test
-  public void testEnableDiscardOutOfOrderDataForInsertTablet1()
-      throws QueryProcessException, IllegalPathException, IOException {
-    IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
-    boolean defaultEnableDiscard = config.isEnableDiscardOutOfOrderData();
-    long defaultTimePartition = config.getPartitionInterval();
-    boolean defaultEnablePartition = config.isEnablePartition();
-    config.setEnableDiscardOutOfOrderData(true);
-    config.setEnablePartition(true);
-    config.setPartitionInterval(100);
-
-    String[] measurements = new String[2];
-    measurements[0] = "s0";
-    measurements[1] = "s1";
-    List<Integer> dataTypes = new ArrayList<>();
-    dataTypes.add(TSDataType.INT32.ordinal());
-    dataTypes.add(TSDataType.INT64.ordinal());
-
-    MeasurementMNode[] measurementMNodes = new MeasurementMNode[2];
-    measurementMNodes[0] = new MeasurementMNode(null, "s0",
-        new MeasurementSchema("s0", TSDataType.INT32, TSEncoding.PLAIN), null);
-    measurementMNodes[1] = new MeasurementMNode(null, "s1",
-        new MeasurementSchema("s1", TSDataType.INT64, TSEncoding.PLAIN), null);
-
-    InsertTabletPlan insertTabletPlan1 = new InsertTabletPlan(new 
PartialPath("root.vehicle.d0"), measurements,
-        dataTypes);
-
-    long[] times = new long[100];
-    Object[] columns = new Object[2];
-    columns[0] = new int[100];
-    columns[1] = new long[100];
-
-    for (int r = 0; r < 100; r++) {
-      times[r] = r;
-      ((int[]) columns[0])[r] = 1;
-      ((long[]) columns[1])[r] = 1;
-    }
-    insertTabletPlan1.setTimes(times);
-    insertTabletPlan1.setColumns(columns);
-    insertTabletPlan1.setRowCount(times.length);
-    insertTabletPlan1.setMeasurementMNodes(measurementMNodes);
-
-    processor.insertTablet(insertTabletPlan1);
-    processor.asyncCloseAllWorkingTsFileProcessors();
-
-    InsertTabletPlan insertTabletPlan2 = new InsertTabletPlan(new 
PartialPath("root.vehicle.d0"), measurements,
-        dataTypes);
-
-    for (int r = 149; r >= 50; r--) {
-      times[r - 50] = r;
-      ((int[]) columns[0])[r - 50] = 1;
-      ((long[]) columns[1])[r - 50] = 1;
-    }
-    insertTabletPlan2.setTimes(times);
-    insertTabletPlan2.setColumns(columns);
-    insertTabletPlan2.setRowCount(times.length);
-    insertTabletPlan2.setMeasurementMNodes(measurementMNodes);
-
-    processor.insertTablet(insertTabletPlan2);
-    processor.asyncCloseAllWorkingTsFileProcessors();
-    processor.syncCloseAllWorkingTsFileProcessors();
-
-    for (TsFileProcessor tsfileProcessor : 
processor.getWorkUnsequenceTsFileProcessor()) {
-      tsfileProcessor.syncFlush();
-    }
-
-    QueryDataSource queryDataSource = processor.query(new 
PartialPath(deviceId), measurementId, context,
-        null, null);
-
-    Assert.assertEquals(2, queryDataSource.getSeqResources().size());
-    Assert.assertEquals(0, queryDataSource.getUnseqResources().size());
-    for (TsFileResource resource : queryDataSource.getSeqResources()) {
-      Assert.assertTrue(resource.isClosed());
-    }
-
-    config.setEnableDiscardOutOfOrderData(defaultEnableDiscard);
-    config.setPartitionInterval(defaultTimePartition);
-    config.setEnablePartition(defaultEnablePartition);
-  }
-
-  @Test
-  public void testEnableDiscardOutOfOrderDataForInsertTablet2()
-      throws QueryProcessException, IllegalPathException, IOException {
-    IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
-    boolean defaultEnableDiscard = config.isEnableDiscardOutOfOrderData();
-    long defaultTimePartition = config.getPartitionInterval();
-    boolean defaultEnablePartition = config.isEnablePartition();
-    config.setEnableDiscardOutOfOrderData(true);
-    config.setEnablePartition(true);
-    config.setPartitionInterval(1200);
-
-    String[] measurements = new String[2];
-    measurements[0] = "s0";
-    measurements[1] = "s1";
-    List<Integer> dataTypes = new ArrayList<>();
-    dataTypes.add(TSDataType.INT32.ordinal());
-    dataTypes.add(TSDataType.INT64.ordinal());
-
-    MeasurementMNode[] measurementMNodes = new MeasurementMNode[2];
-    measurementMNodes[0] = new MeasurementMNode(null, "s0",
-        new MeasurementSchema("s0", TSDataType.INT32, TSEncoding.PLAIN), null);
-    measurementMNodes[1] = new MeasurementMNode(null, "s1",
-        new MeasurementSchema("s1", TSDataType.INT64, TSEncoding.PLAIN), null);
-
-    InsertTabletPlan insertTabletPlan1 = new InsertTabletPlan(new 
PartialPath("root.vehicle.d0"), measurements,
-        dataTypes);
-
-    long[] times = new long[1200];
-    Object[] columns = new Object[2];
-    columns[0] = new int[1200];
-    columns[1] = new long[1200];
-
-    for (int r = 0; r < 1200; r++) {
-      times[r] = r;
-      ((int[]) columns[0])[r] = 1;
-      ((long[]) columns[1])[r] = 1;
-    }
-    insertTabletPlan1.setTimes(times);
-    insertTabletPlan1.setColumns(columns);
-    insertTabletPlan1.setRowCount(times.length);
-    insertTabletPlan1.setMeasurementMNodes(measurementMNodes);
-
-    processor.insertTablet(insertTabletPlan1);
-    processor.asyncCloseAllWorkingTsFileProcessors();
-
-    InsertTabletPlan insertTabletPlan2 = new InsertTabletPlan(new 
PartialPath("root.vehicle.d0"), measurements,
-        dataTypes);
-
-    for (int r = 1249; r >= 50; r--) {
-      times[r - 50] = r;
-      ((int[]) columns[0])[r - 50] = 1;
-      ((long[]) columns[1])[r - 50] = 1;
-    }
-    insertTabletPlan2.setTimes(times);
-    insertTabletPlan2.setColumns(columns);
-    insertTabletPlan2.setRowCount(times.length);
-    insertTabletPlan2.setMeasurementMNodes(measurementMNodes);
-
-    processor.insertTablet(insertTabletPlan2);
-    processor.asyncCloseAllWorkingTsFileProcessors();
-    processor.syncCloseAllWorkingTsFileProcessors();
-
-    for (TsFileProcessor tsfileProcessor : 
processor.getWorkUnsequenceTsFileProcessor()) {
-      tsfileProcessor.syncFlush();
-    }
-
-    QueryDataSource queryDataSource = processor.query(new 
PartialPath(deviceId), measurementId, context,
-        null, null);
-
-    Assert.assertEquals(2, queryDataSource.getSeqResources().size());
-    Assert.assertEquals(0, queryDataSource.getUnseqResources().size());
-    for (TsFileResource resource : queryDataSource.getSeqResources()) {
-      Assert.assertTrue(resource.isClosed());
-    }
-
-    config.setEnableDiscardOutOfOrderData(defaultEnableDiscard);
-    config.setPartitionInterval(defaultTimePartition);
-    config.setEnablePartition(defaultEnablePartition);
-  }
-
-  @Test
-  public void testEnableDiscardOutOfOrderDataForInsertTablet3()
-      throws QueryProcessException, IllegalPathException, IOException {
-    IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
-    boolean defaultEnableDiscard = config.isEnableDiscardOutOfOrderData();
-    long defaultTimePartition = config.getPartitionInterval();
-    boolean defaultEnablePartition = config.isEnablePartition();
-    config.setEnableDiscardOutOfOrderData(true);
-    config.setEnablePartition(true);
-    config.setPartitionInterval(1000);
-
-    String[] measurements = new String[2];
-    measurements[0] = "s0";
-    measurements[1] = "s1";
-    List<Integer> dataTypes = new ArrayList<>();
-    dataTypes.add(TSDataType.INT32.ordinal());
-    dataTypes.add(TSDataType.INT64.ordinal());
-
-    MeasurementMNode[] measurementMNodes = new MeasurementMNode[2];
-    measurementMNodes[0] = new MeasurementMNode(null, "s0",
-        new MeasurementSchema("s0", TSDataType.INT32, TSEncoding.PLAIN), null);
-    measurementMNodes[1] = new MeasurementMNode(null, "s1",
-        new MeasurementSchema("s1", TSDataType.INT64, TSEncoding.PLAIN), null);
-
-    InsertTabletPlan insertTabletPlan1 = new InsertTabletPlan(new 
PartialPath("root.vehicle.d0"), measurements,
-        dataTypes);
-
-    long[] times = new long[1200];
-    Object[] columns = new Object[2];
-    columns[0] = new int[1200];
-    columns[1] = new long[1200];
-
-    for (int r = 0; r < 1200; r++) {
-      times[r] = r;
-      ((int[]) columns[0])[r] = 1;
-      ((long[]) columns[1])[r] = 1;
-    }
-    insertTabletPlan1.setTimes(times);
-    insertTabletPlan1.setColumns(columns);
-    insertTabletPlan1.setRowCount(times.length);
-    insertTabletPlan1.setMeasurementMNodes(measurementMNodes);
-
-    processor.insertTablet(insertTabletPlan1);
-    processor.asyncCloseAllWorkingTsFileProcessors();
-
-    InsertTabletPlan insertTabletPlan2 = new InsertTabletPlan(new 
PartialPath("root.vehicle.d0"), measurements,
-        dataTypes);
-
-    for (int r = 1249; r >= 50; r--) {
-      times[r - 50] = r;
-      ((int[]) columns[0])[r - 50] = 1;
-      ((long[]) columns[1])[r - 50] = 1;
-    }
-    insertTabletPlan2.setTimes(times);
-    insertTabletPlan2.setColumns(columns);
-    insertTabletPlan2.setRowCount(times.length);
-    insertTabletPlan2.setMeasurementMNodes(measurementMNodes);
-
-    processor.insertTablet(insertTabletPlan2);
-    processor.asyncCloseAllWorkingTsFileProcessors();
-    processor.syncCloseAllWorkingTsFileProcessors();
-
-    for (TsFileProcessor tsfileProcessor : 
processor.getWorkUnsequenceTsFileProcessor()) {
-      tsfileProcessor.syncFlush();
-    }
-
-    QueryDataSource queryDataSource = processor.query(new 
PartialPath(deviceId), measurementId, context,
-        null, null);
-
-    Assert.assertEquals(2, queryDataSource.getSeqResources().size());
-    Assert.assertEquals(0, queryDataSource.getUnseqResources().size());
-    for (TsFileResource resource : queryDataSource.getSeqResources()) {
-      Assert.assertTrue(resource.isClosed());
-    }
-
-    config.setEnableDiscardOutOfOrderData(defaultEnableDiscard);
-    config.setPartitionInterval(defaultTimePartition);
-    config.setEnablePartition(defaultEnablePartition);
-  }
-
-  @Test
-  public void testMerge() throws WriteProcessException, QueryProcessException, 
IllegalPathException {
-
-    mergeLock = new AtomicLong(0);
+  public void testMerge()
+      throws WriteProcessException, QueryProcessException, 
IllegalPathException {
     for (int j = 21; j <= 30; j++) {
       TSRecord record = new TSRecord(j, deviceId);
       record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId, 
String.valueOf(j)));
@@ -570,12 +289,13 @@ public class StorageGroupProcessorTest {
 
     processor.syncCloseAllWorkingTsFileProcessors();
     processor.merge(true);
-    while (mergeLock.get() == 0) {
+    while (processor.tsFileManagement.isUnseqMerging) {
       // wait
     }
 
-    QueryDataSource queryDataSource = processor.query(new 
PartialPath(deviceId), measurementId, context,
-        null, null);
+    QueryDataSource queryDataSource = processor
+        .query(new PartialPath(deviceId), measurementId, context,
+            null, null);
     Assert.assertEquals(10, queryDataSource.getSeqResources().size());
     Assert.assertEquals(0, queryDataSource.getUnseqResources().size());
     for (TsFileResource resource : queryDataSource.getSeqResources()) {
@@ -592,12 +312,5 @@ public class StorageGroupProcessorTest {
       super(systemInfoDir, storageGroupName, new 
TsFileFlushPolicy.DirectFlushPolicy());
     }
 
-    @Override
-    protected void mergeEndAction(List<TsFileResource> seqFiles, 
List<TsFileResource> unseqFiles,
-        File mergeLog) {
-      super.mergeEndAction(seqFiles, unseqFiles, mergeLog);
-      mergeLock.incrementAndGet();
-      assertFalse(mergeLog.exists());
-    }
   }
 }
\ No newline at end of file
diff --git 
a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBMergeTest.java 
b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBMergeTest.java
index d058ba5..f65130d 100644
--- a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBMergeTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBMergeTest.java
@@ -294,7 +294,7 @@ public class IoTDBMergeTest {
       }
       // it is uncertain whether the sub tasks are created at this time point, 
and we are only
       // sure that the main task is created
-      assertEquals(0, cnt);
+      assertEquals(1, cnt);
     }
   }
 }

Reply via email to