This is an automated email from the ASF dual-hosted git repository.

hxd pushed a commit to branch refactor_filenode2
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git


The following commit(s) were added to refs/heads/refactor_filenode2 by this 
push:
     new 15b1385  temporary commit
15b1385 is described below

commit 15b138579561d129be66be87b938e8bf658d597e
Author: xiangdong huang <[email protected]>
AuthorDate: Sat Mar 9 19:05:33 2019 +0800

    temporary commit
---
 .../engine/bufferwrite/BufferWriteProcessor.java   |   22 +-
 .../db/engine/filenode/FileNodeProcessor2.java     | 1740 +-------------------
 .../db/engine/filenode/OverflowChangeType.java     |   12 +-
 .../engine/filenode/FileNodeFlushFutureTest.java   |  168 ++
 4 files changed, 218 insertions(+), 1724 deletions(-)

diff --git 
a/iotdb/src/main/java/org/apache/iotdb/db/engine/bufferwrite/BufferWriteProcessor.java
 
b/iotdb/src/main/java/org/apache/iotdb/db/engine/bufferwrite/BufferWriteProcessor.java
index dc56312..c0ca854 100644
--- 
a/iotdb/src/main/java/org/apache/iotdb/db/engine/bufferwrite/BufferWriteProcessor.java
+++ 
b/iotdb/src/main/java/org/apache/iotdb/db/engine/bufferwrite/BufferWriteProcessor.java
@@ -67,9 +67,9 @@ public class BufferWriteProcessor extends Processor {
   private long memThreshold = 
TSFileDescriptor.getInstance().getConfig().groupSizeInByte;
   private IMemTable workMemTable;
   private IMemTable flushMemTable;
-  private Action bufferwriteFlushAction;
-  private Action bufferwriteCloseAction;
-  private Action filenodeFlushAction;
+  private Action preFlushAction;
+  private Action closeAction;
+  private Action flushAction;
 
   //lastFlushTime time unit: nanosecond
   private long lastFlushTime = -1;
@@ -119,9 +119,9 @@ public class BufferWriteProcessor extends Processor {
       throw new BufferWriteProcessorException(e);
     }
 
-    bufferwriteFlushAction = 
parameters.get(FileNodeConstants.BUFFERWRITE_FLUSH_ACTION);
-    bufferwriteCloseAction = 
parameters.get(FileNodeConstants.BUFFERWRITE_CLOSE_ACTION);
-    filenodeFlushAction = 
parameters.get(FileNodeConstants.FILENODE_PROCESSOR_FLUSH_ACTION);
+    preFlushAction = 
parameters.get(FileNodeConstants.BUFFERWRITE_FLUSH_ACTION);
+    closeAction = parameters.get(FileNodeConstants.BUFFERWRITE_CLOSE_ACTION);
+    flushAction = 
parameters.get(FileNodeConstants.FILENODE_PROCESSOR_FLUSH_ACTION);
     workMemTable = new PrimitiveMemTable();
 
     if (IoTDBDescriptor.getInstance().getConfig().enableWal) {
@@ -285,14 +285,14 @@ public class BufferWriteProcessor extends Processor {
         writer.flush();
       }
 
-      filenodeFlushAction.act();
+      flushAction.act();
       if (IoTDBDescriptor.getInstance().getConfig().enableWal) {
         logNode.notifyEndFlush(null);
       }
       result = true;
     } catch (Exception e) {
       LOGGER.error(
-          "The bufferwrite processor {} failed to flush {}, when calling the 
filenodeFlushAction.",
+          "The bufferwrite processor {} failed to flush {}, when calling the 
flushAction.",
           getProcessorName(), displayMessage, e);
       result = false;
     } finally {
@@ -342,7 +342,7 @@ public class BufferWriteProcessor extends Processor {
       }
       // update the lastUpdatetime, prepare for flush
       try {
-        bufferwriteFlushAction.act();
+        preFlushAction.act();
       } catch (Exception e) {
         LOGGER.error("Failed to flush bufferwrite row group when calling the 
action function.");
         throw new IOException(e);
@@ -378,9 +378,9 @@ public class BufferWriteProcessor extends Processor {
       // end file
       writer.endFile(fileSchema);
       // update the IntervalFile for interval list
-      bufferwriteCloseAction.act();
+      closeAction.act();
       // flush the changed information for filenode
-      filenodeFlushAction.act();
+      flushAction.act();
       // delete the restore for this bufferwrite processor
       if (LOGGER.isInfoEnabled()) {
         long closeEndTime = System.currentTimeMillis();
diff --git 
a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessor2.java
 
b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessor2.java
index 311d04a..a63f247 100644
--- 
a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessor2.java
+++ 
b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessor2.java
@@ -112,126 +112,7 @@ public class FileNodeProcessor2 extends Processor 
implements IStatistic {
   private static final IoTDBConfig TsFileDBConf = 
IoTDBDescriptor.getInstance().getConfig();
   private static final MManager mManager = MManager.getInstance();
   private static final Directories directories = Directories.getInstance();
-  private final String statStorageDeltaName;
-  private final HashMap<String, AtomicLong> statParamsHashMap = new 
HashMap<>();
-  /**
-   * Used to keep the oldest timestamp for each deviceId. The key is deviceId.
-   */
-  private volatile boolean isOverflowed;
-  private Map<String, Long> lastUpdateTimeMap;
-  private Map<String, Long> flushLastUpdateTimeMap;
-  private Map<String, List<IntervalFileNode>> invertedIndexOfFiles;
-  private IntervalFileNode emptyIntervalFileNode;
-  private IntervalFileNode currentIntervalFileNode;
-  private List<IntervalFileNode> newFileNodes;
-  private FileNodeProcessorStatus isMerging;
-  // this is used when work->merge operation
-  private int numOfMergeFile;
-  private FileNodeProcessorStore fileNodeProcessorStore;
-  private String fileNodeRestoreFilePath;
-  private final Object fileNodeRestoreLock = new Object();
-  private String baseDirPath;
-  // last merge time
-  private long lastMergeTime = -1;
-  private BufferWriteProcessor bufferWriteProcessor = null;
-  private OverflowProcessor overflowProcessor = null;
-  private Set<Integer> oldMultiPassTokenSet = null;
-  private Set<Integer> newMultiPassTokenSet = new HashSet<>();
-  private ReadWriteLock oldMultiPassLock = null;
-  private ReadWriteLock newMultiPassLock = new ReentrantReadWriteLock(false);
-  // system recovery
-  private boolean shouldRecovery = false;
-  // statistic monitor parameters
-  private Map<String, Action> parameters;
-
-  private FileSchema fileSchema;
 
-  /**
-   * used for saving fileNodeProcessorStore on disk.
-   */
-  private Action flushFileNodeProcessorAction = () -> {
-    synchronized (fileNodeProcessorStore) {
-      try {
-        writeStoreToDisk(fileNodeProcessorStore);
-      } catch (FileNodeProcessorException e) {
-        throw new ActionException(e);
-      }
-    }
-  };
-  /**
-   * used for updating flushLastUpdateTimeMap as lastUpdateTimeMap.value()+1,
-   * and updating lastUpdateTimeMap into fileNodeProcessorStore
-   */
-  private Action bufferwriteFlushAction = () -> {
-    // update the lastUpdateTime Notice: Thread safe
-    synchronized (fileNodeProcessorStore) {
-      // deep copy
-      Map<String, Long> tempLastUpdateMap = new HashMap<>(lastUpdateTimeMap);
-      // update flushLastUpdateTimeMap
-      for (Entry<String, Long> entry : lastUpdateTimeMap.entrySet()) {
-        flushLastUpdateTimeMap.put(entry.getKey(), entry.getValue() + 1);
-      }
-      fileNodeProcessorStore.setLastUpdateTimeMap(tempLastUpdateMap);
-    }
-  };
-
-  /**
-   *
-   */
-  private Action bufferwriteCloseAction = new Action() {
-
-    @Override
-    public void act() {
-      synchronized (fileNodeProcessorStore) {
-        //why do not deep copy?
-        fileNodeProcessorStore.setLastUpdateTimeMap(lastUpdateTimeMap);
-        addLastTimeToIntervalFile();
-        fileNodeProcessorStore.setNewFileNodes(newFileNodes);
-      }
-    }
-
-    /**
-     * modify the endTimeMap of currentIntervalFileNode
-     */
-    private void addLastTimeToIntervalFile() {
-
-      if (!newFileNodes.isEmpty()) {
-        // end time with one start time
-        Map<String, Long> endTimeMap = new HashMap<>();
-        for (Entry<String, Long> startTime : 
currentIntervalFileNode.getStartTimeMap().entrySet()) {
-          String deviceId = startTime.getKey();
-          endTimeMap.put(deviceId, lastUpdateTimeMap.get(deviceId));
-        }
-        currentIntervalFileNode.setEndTimeMap(endTimeMap);
-      }
-    }
-  };
-  private Action overflowFlushAction = () -> {
-
-    // update the new IntervalFileNode List and emptyIntervalFile.
-    // Notice: thread safe
-    synchronized (fileNodeProcessorStore) {
-      fileNodeProcessorStore.setOverflowed(isOverflowed);
-      fileNodeProcessorStore.setEmptyIntervalFileNode(emptyIntervalFileNode);
-      fileNodeProcessorStore.setNewFileNodes(newFileNodes);
-    }
-  };
-  // Token for query which used to
-  private int multiPassLockToken = 0;
-  private VersionController versionController;
-  private ReentrantLock mergeDeleteLock = new ReentrantLock();
-
-  /**
-   * This is the modification file of the result of the current merge.
-   */
-  private ModificationFile mergingModification;
-
-  private TsFileIOWriter mergeFileWriter = null;
-  private String mergeOutputPath = null;
-  private String mergeBaseDir = null;
-  private String mergeFileName = null;
-  private boolean mergeIsChunkGroupHasData = false;
-  private long mergeStartPos;
 
   /**
    * constructor of FileNodeProcessor.
@@ -239,190 +120,32 @@ public class FileNodeProcessor2 extends Processor 
implements IStatistic {
   FileNodeProcessor2(String fileNodeDirPath, String processorName)
       throws FileNodeProcessorException {
     super(processorName);
-    for (MonitorConstants.FileNodeProcessorStatConstants statConstant :
-        MonitorConstants.FileNodeProcessorStatConstants.values()) {
-      statParamsHashMap.put(statConstant.name(), new AtomicLong(0));
-    }
-    statStorageDeltaName =
-        MonitorConstants.STAT_STORAGE_GROUP_PREFIX + 
MonitorConstants.MONITOR_PATH_SEPERATOR
-            + MonitorConstants.FILE_NODE_PATH + 
MonitorConstants.MONITOR_PATH_SEPERATOR
-            + processorName.replaceAll("\\.", "_");
-
-    this.parameters = new HashMap<>();
-    String dirPath = fileNodeDirPath;
-    if (dirPath.length() > 0
-        && dirPath.charAt(dirPath.length() - 1) != File.separatorChar) {
-      dirPath = dirPath + File.separatorChar;
-    }
-    this.baseDirPath = dirPath + processorName;
-    File dataDir = new File(this.baseDirPath);
-    if (!dataDir.exists()) {
-      dataDir.mkdirs();
-      LOGGER.info(
-          "The data directory of the filenode processor {} doesn't exist. 
Create new " +
-              "directory {}",
-          getProcessorName(), baseDirPath);
-    }
-    fileNodeRestoreFilePath = new File(dataDir, processorName + 
RESTORE_FILE_SUFFIX).getPath();
-    try {
-      fileNodeProcessorStore = readStoreFromDisk();
-    } catch (FileNodeProcessorException e) {
-      LOGGER.error(
-          "The fileNode processor {} encountered an error when recoverying 
restore " +
-              "information.",
-          processorName, e);
-      throw new FileNodeProcessorException(e);
-    }
-    // TODO deep clone the lastupdate time
-    lastUpdateTimeMap = fileNodeProcessorStore.getLastUpdateTimeMap();
-    emptyIntervalFileNode = fileNodeProcessorStore.getEmptyIntervalFileNode();
-    newFileNodes = fileNodeProcessorStore.getNewFileNodes();
-    isMerging = fileNodeProcessorStore.getFileNodeProcessorStatus();
-    numOfMergeFile = fileNodeProcessorStore.getNumOfMergeFile();
-    invertedIndexOfFiles = new HashMap<>();
-    // deep clone
-    flushLastUpdateTimeMap = new HashMap<>();
-    for (Entry<String, Long> entry : lastUpdateTimeMap.entrySet()) {
-      flushLastUpdateTimeMap.put(entry.getKey(), entry.getValue() + 1);
-    }
-    // construct the fileschema
-    try {
-      this.fileSchema = constructFileSchema(processorName);
-    } catch (WriteProcessException e) {
-      throw new FileNodeProcessorException(e);
-    }
-    // status is not NONE, or the last intervalFile is not closed
-    if (isMerging != FileNodeProcessorStatus.NONE
-        || (!newFileNodes.isEmpty() && !newFileNodes.get(newFileNodes.size() - 
1).isClosed())) {
-      shouldRecovery = true;
-    } else {
-      // add file into the index of file
-      addAllFileIntoIndex(newFileNodes);
-    }
-    // RegistStatService
-    if (TsFileDBConf.enableStatMonitor) {
-      StatMonitor statMonitor = StatMonitor.getInstance();
-      registStatMetadata();
-      statMonitor.registStatistics(statStorageDeltaName, this);
-    }
-    try {
-      versionController = new SimpleFileVersionController(fileNodeDirPath);
-    } catch (IOException e) {
-      throw new FileNodeProcessorException(e);
-    }
+
   }
 
   @Override
   public Map<String, AtomicLong> getStatParamsHashMap() {
-    return statParamsHashMap;
+    return null;
   }
 
   @Override
   public void registStatMetadata() {
-    Map<String, String> hashMap = new HashMap<>();
-    for (MonitorConstants.FileNodeProcessorStatConstants statConstant :
-        MonitorConstants.FileNodeProcessorStatConstants.values()) {
-      hashMap
-          .put(statStorageDeltaName + MonitorConstants.MONITOR_PATH_SEPERATOR 
+ statConstant.name(),
-              MonitorConstants.DATA_TYPE);
-    }
-    StatMonitor.getInstance().registStatStorageGroup(hashMap);
   }
 
   @Override
   public List<String> getAllPathForStatistic() {
-    List<String> list = new ArrayList<>();
-    for (MonitorConstants.FileNodeProcessorStatConstants statConstant :
-        MonitorConstants.FileNodeProcessorStatConstants.values()) {
-      list.add(
-          statStorageDeltaName + MonitorConstants.MONITOR_PATH_SEPERATOR + 
statConstant.name());
-    }
-    return list;
+
   }
 
   @Override
   public Map<String, TSRecord> getAllStatisticsValue() {
-    Long curTime = System.currentTimeMillis();
-    HashMap<String, TSRecord> tsRecordHashMap = new HashMap<>();
-    TSRecord tsRecord = new TSRecord(curTime, statStorageDeltaName);
-
-    Map<String, AtomicLong> hashMap = getStatParamsHashMap();
-    tsRecord.dataPointList = new ArrayList<>();
-    for (Entry<String, AtomicLong> entry : hashMap.entrySet()) {
-      tsRecord.dataPointList.add(new LongDataPoint(entry.getKey(), 
entry.getValue().get()));
-    }
-
-    tsRecordHashMap.put(statStorageDeltaName, tsRecord);
-    return tsRecordHashMap;
-  }
 
-  /**
-   * add interval FileNode.
-   */
-  void addIntervalFileNode(String baseDir, String fileName) throws 
ActionException {
-
-    IntervalFileNode intervalFileNode = new 
IntervalFileNode(OverflowChangeType.NO_CHANGE, baseDir,
-        fileName);
-    this.currentIntervalFileNode = intervalFileNode;
-    newFileNodes.add(intervalFileNode);
-    fileNodeProcessorStore.setNewFileNodes(newFileNodes);
-    flushFileNodeProcessorAction.act();
   }
 
-  /**
-   * set interval filenode start time.
-   *
-   * @param deviceId device ID
-   */
-  void setIntervalFileNodeStartTime(String deviceId) {
-    if (currentIntervalFileNode.getStartTime(deviceId) == -1) {
-      currentIntervalFileNode.setStartTime(deviceId, 
flushLastUpdateTimeMap.get(deviceId));
-      if (!invertedIndexOfFiles.containsKey(deviceId)) {
-        invertedIndexOfFiles.put(deviceId, new ArrayList<>());
-      }
-      invertedIndexOfFiles.get(deviceId).add(currentIntervalFileNode);
-    }
-  }
-
-  /**
-   * clear filenode.
-   */
-  public void clearFileNode() {
-    isOverflowed = false;
-    emptyIntervalFileNode = new IntervalFileNode(OverflowChangeType.NO_CHANGE, 
null);
-    newFileNodes = new ArrayList<>();
-    isMerging = FileNodeProcessorStatus.NONE;
-    numOfMergeFile = 0;
-    fileNodeProcessorStore.setLastUpdateTimeMap(lastUpdateTimeMap);
-    fileNodeProcessorStore.setFileNodeProcessorStatus(isMerging);
-    fileNodeProcessorStore.setNewFileNodes(newFileNodes);
-    fileNodeProcessorStore.setNumOfMergeFile(numOfMergeFile);
-    fileNodeProcessorStore.setEmptyIntervalFileNode(emptyIntervalFileNode);
-  }
 
-  private void addAllFileIntoIndex(List<IntervalFileNode> fileList) {
-    // clear map
-    invertedIndexOfFiles.clear();
-    // add all file to index
-    for (IntervalFileNode fileNode : fileList) {
-      if (fileNode.getStartTimeMap().isEmpty()) {
-        continue;
-      }
-      for (String deviceId : fileNode.getStartTimeMap().keySet()) {
-        if (!invertedIndexOfFiles.containsKey(deviceId)) {
-          invertedIndexOfFiles.put(deviceId, new ArrayList<>());
-        }
-        invertedIndexOfFiles.get(deviceId).add(fileNode);
-      }
-    }
-  }
-
-  public boolean shouldRecovery() {
-    return shouldRecovery;
-  }
 
   public boolean isOverflowed() {
-    return isOverflowed;
+
   }
 
   /**
@@ -430,329 +153,70 @@ public class FileNodeProcessor2 extends Processor 
implements IStatistic {
    * <code>isOverflowed</code> to true.
    */
   public void setOverflowed(boolean isOverflowed) {
-    if (this.isOverflowed != isOverflowed) {
-      this.isOverflowed = isOverflowed;
-    }
+
   }
 
   public FileNodeProcessorStatus getFileNodeProcessorStatus() {
-    return isMerging;
-  }
 
-  /**
-   * execute filenode recovery.
-   */
-  public void fileNodeRecovery() throws FileNodeProcessorException {
-    // restore bufferwrite
-    if (!newFileNodes.isEmpty() && !newFileNodes.get(newFileNodes.size() - 
1).isClosed()) {
-      //
-      // add the current file
-      //
-      currentIntervalFileNode = newFileNodes.get(newFileNodes.size() - 1);
-
-      // this bufferwrite file is not close by normal operation
-      String damagedFilePath = newFileNodes.get(newFileNodes.size() - 
1).getFilePath();
-      String[] fileNames = damagedFilePath.split("\\" + File.separator);
-      // all information to recovery the damaged file.
-      // contains file seriesPath, action parameters and processorName
-      parameters.put(FileNodeConstants.BUFFERWRITE_FLUSH_ACTION, 
bufferwriteFlushAction);
-      parameters.put(FileNodeConstants.BUFFERWRITE_CLOSE_ACTION, 
bufferwriteCloseAction);
-      parameters
-          .put(FileNodeConstants.FILENODE_PROCESSOR_FLUSH_ACTION, 
flushFileNodeProcessorAction);
-      String baseDir = directories
-          .getTsFileFolder(newFileNodes.get(newFileNodes.size() - 
1).getBaseDirIndex());
-      if (LOGGER.isInfoEnabled()) {
-        LOGGER.info(
-            "The filenode processor {} will recovery the bufferwrite 
processor, "
-                + "the bufferwrite file is {}",
-            getProcessorName(), fileNames[fileNames.length - 1]);
-      }
-
-      try {
-        bufferWriteProcessor = new BufferWriteProcessor(baseDir, 
getProcessorName(),
-            fileNames[fileNames.length - 1], parameters, versionController, 
fileSchema);
-      } catch (BufferWriteProcessorException e) {
-        LOGGER.error(
-            "The filenode processor {} failed to recovery the bufferwrite 
processor, "
-                + "the last bufferwrite file is {}.",
-            getProcessorName(), fileNames[fileNames.length - 1]);
-        throw new FileNodeProcessorException(e);
-      }
-    }
-    // restore the overflow processor
-    LOGGER.info("The filenode processor {} will recovery the overflow 
processor.",
-        getProcessorName());
-    parameters.put(FileNodeConstants.OVERFLOW_FLUSH_ACTION, 
overflowFlushAction);
-    parameters.put(FileNodeConstants.FILENODE_PROCESSOR_FLUSH_ACTION, 
flushFileNodeProcessorAction);
-    try {
-      overflowProcessor = new OverflowProcessor(getProcessorName(), 
parameters, fileSchema,
-          versionController);
-    } catch (IOException e) {
-      LOGGER.error("The filenode processor {} failed to recovery the overflow 
processor.",
-          getProcessorName());
-      throw new FileNodeProcessorException(e);
-    }
-
-    shouldRecovery = false;
-
-    if (isMerging == FileNodeProcessorStatus.MERGING_WRITE) {
-      // re-merge all file
-      // if bufferwrite processor is not null, and close
-      LOGGER.info("The filenode processor {} is recovering, the filenode 
status is {}.",
-          getProcessorName(), isMerging);
-      merge();
-    } else if (isMerging == FileNodeProcessorStatus.WAITING) {
-      // unlock
-      LOGGER.info("The filenode processor {} is recovering, the filenode 
status is {}.",
-          getProcessorName(), isMerging);
-      //writeUnlock();
-      switchWaitingToWorking();
-    } else {
-      //writeUnlock();
-    }
-    // add file into index of file
-    addAllFileIntoIndex(newFileNodes);
   }
 
+
+
   /**
    * get buffer write processor by processor name and insert time.
    */
   public BufferWriteProcessor getBufferWriteProcessor(String processorName, 
long insertTime)
       throws FileNodeProcessorException {
-    if (bufferWriteProcessor == null) {
-      Map<String, Action> params = new HashMap<>();
-      params.put(FileNodeConstants.BUFFERWRITE_FLUSH_ACTION, 
bufferwriteFlushAction);
-      params.put(FileNodeConstants.BUFFERWRITE_CLOSE_ACTION, 
bufferwriteCloseAction);
-      params
-          .put(FileNodeConstants.FILENODE_PROCESSOR_FLUSH_ACTION, 
flushFileNodeProcessorAction);
-      String baseDir = directories.getNextFolderForTsfile();
-      LOGGER.info("Allocate folder {} for the new bufferwrite processor.", 
baseDir);
-      // construct processor or restore
-      try {
-        bufferWriteProcessor = new BufferWriteProcessor(baseDir, processorName,
-            insertTime + FileNodeConstants.BUFFERWRITE_FILE_SEPARATOR
-                + System.currentTimeMillis(),
-            params, versionController, fileSchema);
-      } catch (BufferWriteProcessorException e) {
-        LOGGER.error("The filenode processor {} failed to get the bufferwrite 
processor.",
-            processorName, e);
-        throw new FileNodeProcessorException(e);
-      }
-    }
-    return bufferWriteProcessor;
+
   }
 
   /**
    * get buffer write processor.
    */
   public BufferWriteProcessor getBufferWriteProcessor() throws 
FileNodeProcessorException {
-    if (bufferWriteProcessor == null) {
-      LOGGER.error("The bufferwrite processor is null when get the 
bufferwriteProcessor");
-      throw new FileNodeProcessorException("The bufferwrite processor is 
null");
-    }
-    return bufferWriteProcessor;
+
   }
 
   /**
    * get overflow processor by processor name.
    */
   public OverflowProcessor getOverflowProcessor(String processorName) throws 
IOException {
-    if (overflowProcessor == null) {
-      Map<String, Action> params = new HashMap<>();
-      // construct processor or restore
-      params.put(FileNodeConstants.OVERFLOW_FLUSH_ACTION, overflowFlushAction);
-      params
-          .put(FileNodeConstants.FILENODE_PROCESSOR_FLUSH_ACTION, 
flushFileNodeProcessorAction);
-      overflowProcessor = new OverflowProcessor(processorName, params, 
fileSchema,
-          versionController);
-    }
-    return overflowProcessor;
+
   }
 
   /**
    * get overflow processor.
    */
   public OverflowProcessor getOverflowProcessor() {
-    if (overflowProcessor == null) {
-      LOGGER.error("The overflow processor is null when getting the 
overflowProcessor");
-    }
-    return overflowProcessor;
+
   }
 
   boolean hasOverflowProcessor() {
-    return overflowProcessor != null;
+
   }
 
   public void setBufferwriteProcessroToClosed() {
 
-    bufferWriteProcessor = null;
   }
 
   public boolean hasBufferwriteProcessor() {
 
-    return bufferWriteProcessor != null;
   }
 
-  /**
-   * set last update time.
-   */
-  void setLastUpdateTime(String deviceId, long timestamp) {
-    if (!lastUpdateTimeMap.containsKey(deviceId) || 
lastUpdateTimeMap.get(deviceId) < timestamp) {
-      lastUpdateTimeMap.put(deviceId, timestamp);
-    }
-  }
 
-  /**
-   * get last update time.
-   */
-  long getLastUpdateTime(String deviceId) {
-
-    if (lastUpdateTimeMap.containsKey(deviceId)) {
-      return lastUpdateTimeMap.get(deviceId);
-    } else {
-      return -1;
-    }
-  }
-
-  /**
-   * get flush last update time.
-   */
-  long getFlushLastUpdateTime(String deviceId) {
-    if (!flushLastUpdateTimeMap.containsKey(deviceId)) {
-      flushLastUpdateTimeMap.put(deviceId, 0L);
-    }
-    return flushLastUpdateTimeMap.get(deviceId);
-  }
-
-
-  /**
-   * For insert overflow.
-   */
-  void changeTypeToChanged(String deviceId, long timestamp) {
-    if (!invertedIndexOfFiles.containsKey(deviceId)) {
-      LOGGER.warn(
-          WARN_NO_SUCH_OVERFLOWED_FILE
-              + "the data is [device:{},time:{}]",
-          getProcessorName(), deviceId, timestamp);
-      emptyIntervalFileNode.setStartTime(deviceId, 0L);
-      emptyIntervalFileNode.setEndTime(deviceId, getLastUpdateTime(deviceId));
-      emptyIntervalFileNode.changeTypeToChanged(isMerging);
-    } else {
-      List<IntervalFileNode> temp = invertedIndexOfFiles.get(deviceId);
-      int index = searchIndexNodeByTimestamp(deviceId, timestamp, temp);
-      changeTypeToChanged(temp.get(index), deviceId);
-    }
-  }
-
-  private void changeTypeToChanged(IntervalFileNode fileNode, String deviceId) 
{
-    fileNode.changeTypeToChanged(isMerging);
-    if (isMerging == FileNodeProcessorStatus.MERGING_WRITE) {
-      fileNode.addMergeChanged(deviceId);
-    }
-  }
-
-  /**
-   * For update overflow.
-   */
-  public void changeTypeToChanged(String deviceId, long startTime, long 
endTime) {
-    if (!invertedIndexOfFiles.containsKey(deviceId)) {
-      LOGGER.warn(
-          WARN_NO_SUCH_OVERFLOWED_FILE
-              + "the data is [device:{}, start time:{}, end time:{}]",
-          getProcessorName(), deviceId, startTime, endTime);
-      emptyIntervalFileNode.setStartTime(deviceId, 0L);
-      emptyIntervalFileNode.setEndTime(deviceId, getLastUpdateTime(deviceId));
-      emptyIntervalFileNode.changeTypeToChanged(isMerging);
-    } else {
-      List<IntervalFileNode> temp = invertedIndexOfFiles.get(deviceId);
-      int left = searchIndexNodeByTimestamp(deviceId, startTime, temp);
-      int right = searchIndexNodeByTimestamp(deviceId, endTime, temp);
-      for (int i = left; i <= right; i++) {
-        changeTypeToChanged(temp.get(i), deviceId);
-      }
-    }
-  }
-
-  /**
-   * For delete overflow.
-   */
-  public void changeTypeToChangedForDelete(String deviceId, long timestamp) {
-    if (!invertedIndexOfFiles.containsKey(deviceId)) {
-      LOGGER.warn(
-          WARN_NO_SUCH_OVERFLOWED_FILE
-              + "the data is [device:{}, delete time:{}]",
-          getProcessorName(), deviceId, timestamp);
-      emptyIntervalFileNode.setStartTime(deviceId, 0L);
-      emptyIntervalFileNode.setEndTime(deviceId, getLastUpdateTime(deviceId));
-      emptyIntervalFileNode.changeTypeToChanged(isMerging);
-    } else {
-      List<IntervalFileNode> temp = invertedIndexOfFiles.get(deviceId);
-      int index = searchIndexNodeByTimestamp(deviceId, timestamp, temp);
-      for (int i = 0; i <= index; i++) {
-        temp.get(i).changeTypeToChanged(isMerging);
-        if (isMerging == FileNodeProcessorStatus.MERGING_WRITE) {
-          temp.get(i).addMergeChanged(deviceId);
-        }
-      }
-    }
-  }
-
-  /**
-   * Search the index of the interval by the timestamp.
-   *
-   * @return index of interval
-   */
-  private int searchIndexNodeByTimestamp(String deviceId, long timestamp,
-      List<IntervalFileNode> fileList) {
-    int index = 1;
-    while (index < fileList.size()) {
-      if (timestamp < fileList.get(index).getStartTime(deviceId)) {
-        break;
-      } else {
-        index++;
-      }
-    }
-    return index - 1;
-  }
 
   /**
    * add multiple pass lock.
    */
   public int addMultiPassLock() {
-    LOGGER.debug("Add MultiPassLock: read lock newMultiPassLock.");
-    newMultiPassLock.readLock().lock();
-    while (newMultiPassTokenSet.contains(multiPassLockToken)) {
-      multiPassLockToken++;
-    }
-    newMultiPassTokenSet.add(multiPassLockToken);
-    LOGGER.debug("Add multi token:{}, nsPath:{}.", multiPassLockToken, 
getProcessorName());
-    return multiPassLockToken;
+
   }
 
   /**
    * remove multiple pass lock. TODO: use the return value or remove it.
    */
   public boolean removeMultiPassLock(int token) {
-    if (newMultiPassTokenSet.contains(token)) {
-      newMultiPassLock.readLock().unlock();
-      newMultiPassTokenSet.remove(token);
-      LOGGER.debug("Remove multi token:{}, nspath:{}, new set:{}, lock:{}", 
token,
-              getProcessorName(),
-              newMultiPassTokenSet, newMultiPassLock);
-      return true;
-    } else if (oldMultiPassTokenSet != null && 
oldMultiPassTokenSet.contains(token)) {
-      // remove token first, then unlock
-      oldMultiPassLock.readLock().unlock();
-      oldMultiPassTokenSet.remove(token);
-      LOGGER.debug("Remove multi token:{}, old set:{}, lock:{}", token, 
oldMultiPassTokenSet,
-          oldMultiPassLock);
-      return true;
-    } else {
-      LOGGER.error("remove token error:{},new set:{}, old set:{}", token, 
newMultiPassTokenSet,
-          oldMultiPassTokenSet);
-      // should add throw exception
-      return false;
-    }
+
   }
 
   /**
@@ -760,117 +224,10 @@ public class FileNodeProcessor2 extends Processor 
implements IStatistic {
    */
   public <T extends Comparable<T>> QueryDataSource query(String deviceId, 
String measurementId,
       QueryContext context) throws FileNodeProcessorException {
-    // query overflow data
-    MeasurementSchema mSchema;
-    TSDataType dataType;
-
-    //mSchema = mManager.getSchemaForOnePath(deviceId + "." + measurementId);
-    mSchema = fileSchema.getMeasurementSchema(measurementId);
-    dataType = mSchema.getType();
-
-    OverflowSeriesDataSource overflowSeriesDataSource;
-    try {
-      overflowSeriesDataSource = overflowProcessor.query(deviceId, 
measurementId, dataType,
-          mSchema.getProps(), context);
-    } catch (IOException e) {
-      throw new FileNodeProcessorException(e);
-    }
-    // tsfile dataØØ
-    List<IntervalFileNode> bufferwriteDataInFiles = new ArrayList<>();
-    for (IntervalFileNode intervalFileNode : newFileNodes) {
-      // add the same intervalFileNode, but not the same reference
-      if (intervalFileNode.isClosed()) {
-        bufferwriteDataInFiles.add(intervalFileNode.backUp());
-      }
-    }
-    Pair<ReadOnlyMemChunk, List<ChunkMetaData>> bufferwritedata = new 
Pair<>(null, null);
-    // bufferwrite data
-    UnsealedTsFile unsealedTsFile = null;
-
-    if (!newFileNodes.isEmpty() && !newFileNodes.get(newFileNodes.size() - 
1).isClosed()
-        && !newFileNodes.get(newFileNodes.size() - 
1).getStartTimeMap().isEmpty()) {
-      unsealedTsFile = new UnsealedTsFile();
-      unsealedTsFile.setFilePath(newFileNodes.get(newFileNodes.size() - 
1).getFilePath());
-      if (bufferWriteProcessor == null) {
-        LOGGER.error(
-            "The last of tsfile {} in filenode processor {} is not closed, "
-                + "but the bufferwrite processor is null.",
-            newFileNodes.get(newFileNodes.size() - 1).getRelativePath(), 
getProcessorName());
-        throw new FileNodeProcessorException(String.format(
-            "The last of tsfile %s in filenode processor %s is not closed, "
-                + "but the bufferwrite processor is null.",
-            newFileNodes.get(newFileNodes.size() - 1).getRelativePath(), 
getProcessorName()));
-      }
-      bufferwritedata = bufferWriteProcessor
-          .queryBufferWriteData(deviceId, measurementId, dataType, 
mSchema.getProps());
-
-      try {
-        List<Modification> pathModifications = context.getPathModifications(
-            currentIntervalFileNode.getModFile(), deviceId
-                + IoTDBConstant.PATH_SEPARATOR + measurementId
-        );
-        if (!pathModifications.isEmpty()) {
-          QueryUtils.modifyChunkMetaData(bufferwritedata.right, 
pathModifications);
-        }
-      } catch (IOException e) {
-        throw new FileNodeProcessorException(e);
-      }
-
-      unsealedTsFile.setTimeSeriesChunkMetaDatas(bufferwritedata.right);
-    }
-    GlobalSortedSeriesDataSource globalSortedSeriesDataSource = new 
GlobalSortedSeriesDataSource(
-        new Path(deviceId + "." + measurementId), bufferwriteDataInFiles, 
unsealedTsFile,
-        bufferwritedata.left);
-    return new QueryDataSource(globalSortedSeriesDataSource, 
overflowSeriesDataSource);
 
   }
 
-  /**
-   * append one specified tsfile to this filenode processor.
-   * TODO
-   * @param appendFile the appended tsfile information
-   * @param appendFilePath the seriesPath of appended file
-   */
-  void appendFile(IntervalFileNode appendFile, String appendFilePath)
-      throws FileNodeProcessorException {
-    try {
-      if (!new File(appendFile.getFilePath()).getParentFile().exists()) {
-        new File(appendFile.getFilePath()).getParentFile().mkdirs();
-      }
-      // move file
-      File originFile = new File(appendFilePath);
-      File targetFile = new File(appendFile.getFilePath());
-      if (!originFile.exists()) {
-        throw new FileNodeProcessorException(
-            String.format("The appended file %s does not exist.", 
appendFilePath));
-      }
-      if (targetFile.exists()) {
-        throw new FileNodeProcessorException(
-            String.format("The appended target file %s already exists.",
-                appendFile.getFilePath()));
-      }
-      if (!originFile.renameTo(targetFile)) {
-        LOGGER.warn("File renaming failed when appending new file. Origin: {}, 
target: {}",
-            originFile.getPath(),
-            targetFile.getPath());
-      }
-      // append the new tsfile
-      this.newFileNodes.add(appendFile);
-      // update the lastUpdateTime
-      for (Entry<String, Long> entry : appendFile.getEndTimeMap().entrySet()) {
-        lastUpdateTimeMap.put(entry.getKey(), entry.getValue());
-      }
-      bufferwriteFlushAction.act();
-      fileNodeProcessorStore.setNewFileNodes(newFileNodes);
-      // reconstruct the inverted index of the newFileNodes
-      flushFileNodeProcessorAction.act();
-      addAllFileIntoIndex(newFileNodes);
-    } catch (Exception e) {
-      LOGGER.error("Failed to append the tsfile {} to filenode processor {}.", 
appendFile,
-          getProcessorName());
-      throw new FileNodeProcessorException(e);
-    }
-  }
+
 
   /**
    * get overlap tsfiles which are conflict with the appendFile.
@@ -879,41 +236,12 @@ public class FileNodeProcessor2 extends Processor 
implements IStatistic {
    */
   public List<String> getOverlapFiles(IntervalFileNode appendFile, String uuid)
       throws FileNodeProcessorException {
-    List<String> overlapFiles = new ArrayList<>();
-    try {
-      for (IntervalFileNode intervalFileNode : newFileNodes) {
-        getOverlapFiles(appendFile, intervalFileNode, uuid, overlapFiles);
-      }
-    } catch (IOException e) {
-      LOGGER.error("Failed to get overlap tsfiles which conflict with the 
appendFile.");
-      throw new FileNodeProcessorException(e);
-    }
-    return overlapFiles;
+
   }
 
   private void getOverlapFiles(IntervalFileNode appendFile, IntervalFileNode 
intervalFileNode,
       String uuid, List<String> overlapFiles) throws IOException {
-    for (Entry<String, Long> entry : appendFile.getStartTimeMap().entrySet()) {
-      if (intervalFileNode.getStartTimeMap().containsKey(entry.getKey()) &&
-          intervalFileNode.getEndTime(entry.getKey()) >= entry.getValue()
-          && intervalFileNode.getStartTime(entry.getKey()) <= appendFile
-          .getEndTime(entry.getKey())) {
-        String relativeFilePath = "postback" + File.separator + uuid + 
File.separator + "backup"
-            + File.separator + intervalFileNode.getRelativePath();
-        File newFile = new File(
-            
Directories.getInstance().getTsFileFolder(intervalFileNode.getBaseDirIndex()),
-            relativeFilePath);
-        if (!newFile.getParentFile().exists()) {
-          newFile.getParentFile().mkdirs();
-        }
-        java.nio.file.Path link = 
FileSystems.getDefault().getPath(newFile.getPath());
-        java.nio.file.Path target = FileSystems.getDefault()
-            .getPath(intervalFileNode.getFilePath());
-        Files.createLink(link, target);
-        overlapFiles.add(newFile.getPath());
-        break;
-      }
-    }
+
   }
 
   /**
@@ -921,924 +249,58 @@ public class FileNodeProcessor2 extends Processor 
implements IStatistic {
    */
   public void addTimeSeries(String measurementId, TSDataType dataType, 
TSEncoding encoding,
       CompressionType compressor, Map<String, String> props) {
-    fileSchema.registerMeasurement(new MeasurementSchema(measurementId, 
dataType, encoding,
-        compressor, props));
-  }
 
-  /**
-   * submit the merge task to the <code>MergePool</code>.
-   *
-   * @return null -can't submit the merge task, because this filenode is not 
overflowed or it is
-   * merging now. Future - submit the merge task successfully.
-   */
-  Future submitToMerge() {
-    ZoneId zoneId = IoTDBDescriptor.getInstance().getConfig().getZoneID();
-    if (lastMergeTime > 0) {
-      long thisMergeTime = System.currentTimeMillis();
-      long mergeTimeInterval = thisMergeTime - lastMergeTime;
-      ZonedDateTime lastDateTime = 
ofInstant(Instant.ofEpochMilli(lastMergeTime),
-          zoneId);
-      ZonedDateTime thisDateTime = 
ofInstant(Instant.ofEpochMilli(thisMergeTime),
-          zoneId);
-      LOGGER.info(
-          "The filenode {} last merge time is {}, this merge time is {}, "
-              + "merge time interval is {}s",
-          getProcessorName(), lastDateTime, thisDateTime, mergeTimeInterval / 
1000);
-    }
-    lastMergeTime = System.currentTimeMillis();
-
-    if (overflowProcessor != null) {
-      if (overflowProcessor.getFileSize() < IoTDBDescriptor.getInstance()
-          .getConfig().overflowFileSizeThreshold) {
-        if (LOGGER.isInfoEnabled()) {
-          LOGGER.info(
-              "Skip this merge taks submission, because the size{} of overflow 
processor {} "
-                  + "does not reaches the threshold {}.",
-              MemUtils.bytesCntToStr(overflowProcessor.getFileSize()), 
getProcessorName(),
-              MemUtils.bytesCntToStr(
-                  
IoTDBDescriptor.getInstance().getConfig().overflowFileSizeThreshold));
-        }
-        return null;
-      }
-    } else {
-      LOGGER.info(
-          "Skip this merge taks submission, because the filenode processor {} "
-              + "has no overflow processor.",
-          getProcessorName());
-      return null;
-    }
-    if (isOverflowed && isMerging == FileNodeProcessorStatus.NONE) {
-      Runnable mergeThread;
-      mergeThread = new MergeRunnale();
-      LOGGER.info("Submit the merge task, the merge filenode is {}", 
getProcessorName());
-      return MergeManager.getInstance().submit(mergeThread);
-    } else {
-      if (!isOverflowed) {
-        LOGGER.info(
-            "Skip this merge taks submission, because the filenode processor 
{} is not " +
-                "overflowed.",
-            getProcessorName());
-      } else {
-        LOGGER.warn(
-            "Skip this merge task submission, because last merge task is not 
over yet, "
-                + "the merge filenode processor is {}",
-            getProcessorName());
-      }
-    }
-    return null;
   }
 
-  /**
-   * Prepare for merge, close the bufferwrite and overflow.
-   */
-  private void prepareForMerge() {
-    try {
-      LOGGER.info("The filenode processor {} prepares for merge, closes the 
bufferwrite processor",
-          getProcessorName());
-      closeBufferWrite();
-      // try to get overflow processor
-      getOverflowProcessor(getProcessorName());
-      // must close the overflow processor
-      while (!getOverflowProcessor().canBeClosed()) {
-        waitForClosing();
-      }
-      LOGGER.info("The filenode processor {} prepares for merge, closes the 
overflow processor",
-          getProcessorName());
-      getOverflowProcessor().close();
-    } catch (FileNodeProcessorException | OverflowProcessorException | 
IOException e) {
-      LOGGER.error("The filenode processor {} prepares for merge error.", 
getProcessorName());
-      writeUnlock();
-      throw new ErrorDebugException(e);
-    }
-  }
 
-  private void waitForClosing() {
-    try {
-      LOGGER.info(
-          "The filenode processor {} prepares for merge, the overflow {} can't 
be closed, "
-              + "wait 100ms,",
-          getProcessorName(), getProcessorName());
-      TimeUnit.MICROSECONDS.sleep(100);
-    } catch (InterruptedException e) {
-      Thread.currentThread().interrupt();
-    }
-  }
 
   /**
    * Merge this storage group, merge the tsfile data with overflow data.
    */
   public void merge() throws FileNodeProcessorException {
-    // close bufferwrite and overflow, prepare for merge
-    LOGGER.info("The filenode processor {} begins to merge.", 
getProcessorName());
-    prepareForMerge();
-    // change status from overflowed to no overflowed
-    isOverflowed = false;
-    // change status from work to merge
-    isMerging = FileNodeProcessorStatus.MERGING_WRITE;
-    // check the empty file
-    Map<String, Long> startTimeMap = emptyIntervalFileNode.getStartTimeMap();
-    mergeCheckEmptyFile(startTimeMap);
-
-    for (IntervalFileNode intervalFileNode : newFileNodes) {
-      if (intervalFileNode.getOverflowChangeType() != 
OverflowChangeType.NO_CHANGE) {
-        intervalFileNode.setOverflowChangeType(OverflowChangeType.CHANGED);
-      }
-    }
-
-    addAllFileIntoIndex(newFileNodes);
-    synchronized (fileNodeProcessorStore) {
-      fileNodeProcessorStore.setOverflowed(isOverflowed);
-      fileNodeProcessorStore.setFileNodeProcessorStatus(isMerging);
-      fileNodeProcessorStore.setNewFileNodes(newFileNodes);
-      fileNodeProcessorStore.setEmptyIntervalFileNode(emptyIntervalFileNode);
-      // flush this filenode information
-      try {
-        writeStoreToDisk(fileNodeProcessorStore);
-      } catch (FileNodeProcessorException e) {
-        LOGGER.error("The filenode processor {} writes restore information 
error when merging.",
-            getProcessorName(), e);
-        writeUnlock();
-        throw new FileNodeProcessorException(e);
-      }
-    }
-    // add numOfMergeFile to control the number of the merge file
-    List<IntervalFileNode> backupIntervalFiles;
-
-    backupIntervalFiles = switchFileNodeToMerge();
-    //
-    // clear empty file
-    //
-    boolean needEmtpy = false;
-    if (emptyIntervalFileNode.getOverflowChangeType() != 
OverflowChangeType.NO_CHANGE) {
-      needEmtpy = true;
-    }
-    emptyIntervalFileNode.clear();
-    // attention
-    try {
-      overflowProcessor.switchWorkToMerge();
-    } catch (IOException e) {
-      LOGGER.error("The filenode processor {} can't switch overflow processor 
from work to merge.",
-          getProcessorName(), e);
-      writeUnlock();
-      throw new FileNodeProcessorException(e);
-    }
-    LOGGER.info("The filenode processor {} switches from {} to {}.", 
getProcessorName(),
-        FileNodeProcessorStatus.NONE, FileNodeProcessorStatus.MERGING_WRITE);
-    writeUnlock();
-
-    // query tsfile data and overflow data, and merge them
-    int numOfMergeFiles = 0;
-    int allNeedMergeFiles = backupIntervalFiles.size();
-    for (IntervalFileNode backupIntervalFile : backupIntervalFiles) {
-      numOfMergeFiles++;
-      if (backupIntervalFile.getOverflowChangeType() == 
OverflowChangeType.CHANGED) {
-        // query data and merge
-        String filePathBeforeMerge = backupIntervalFile.getRelativePath();
-        try {
-          LOGGER.info(
-              "The filenode processor {} begins merging the {}/{} tsfile[{}] 
with "
-                  + "overflow file, the process is {}%",
-              getProcessorName(), numOfMergeFiles, allNeedMergeFiles, 
filePathBeforeMerge,
-              (int) (((numOfMergeFiles - 1) / (float) allNeedMergeFiles) * 
100));
-          long startTime = System.currentTimeMillis();
-          String newFile = queryAndWriteDataForMerge(backupIntervalFile);
-          long endTime = System.currentTimeMillis();
-          long timeConsume = endTime - startTime;
-          ZoneId zoneId = 
IoTDBDescriptor.getInstance().getConfig().getZoneID();
-          LOGGER.info(
-              "The fileNode processor {} has merged the {}/{} tsfile[{}->{}] 
over, "
-                  + "start time of merge is {}, end time of merge is {}, "
-                  + "time consumption is {}ms,"
-                  + " the process is {}%",
-              getProcessorName(), numOfMergeFiles, allNeedMergeFiles, 
filePathBeforeMerge,
-              newFile, ofInstant(Instant.ofEpochMilli(startTime),
-                  zoneId), ofInstant(Instant.ofEpochMilli(endTime), zoneId), 
timeConsume,
-              numOfMergeFiles / (float) allNeedMergeFiles * 100);
-        } catch (IOException | PathErrorException e) {
-          LOGGER.error("Merge: query and write data error.", e);
-          throw new FileNodeProcessorException(e);
-        }
-      } else if (backupIntervalFile.getOverflowChangeType() == 
OverflowChangeType.MERGING_CHANGE) {
-        LOGGER.error("The overflowChangeType of backupIntervalFile must not be 
{}",
-            OverflowChangeType.MERGING_CHANGE);
-        // handle this error, throw one runtime exception
-        throw new FileNodeProcessorException(
-            "The overflowChangeType of backupIntervalFile must not be "
-                + OverflowChangeType.MERGING_CHANGE);
-      } else {
-        LOGGER.debug(
-            "The filenode processor {} is merging, the interval file {} 
doesn't "
-                + "need to be merged.",
-            getProcessorName(), backupIntervalFile.getRelativePath());
-      }
-    }
-
-    // change status from merge to wait
-    switchMergeToWaiting(backupIntervalFiles, needEmtpy);
-
-    // change status from wait to work
-    switchWaitingToWorking();
-  }
-
-  private void mergeCheckEmptyFile(Map<String, Long> startTimeMap) {
-    if (emptyIntervalFileNode.getOverflowChangeType() == 
OverflowChangeType.NO_CHANGE) {
-      return;
-    }
-    Iterator<Entry<String, Long>> iterator = 
emptyIntervalFileNode.getEndTimeMap().entrySet()
-        .iterator();
-    while (iterator.hasNext()) {
-      Entry<String, Long> entry = iterator.next();
-      String deviceId = entry.getKey();
-      if (invertedIndexOfFiles.containsKey(deviceId)) {
-        
invertedIndexOfFiles.get(deviceId).get(0).setOverflowChangeType(OverflowChangeType.CHANGED);
-        startTimeMap.remove(deviceId);
-        iterator.remove();
-      }
-    }
-    if (emptyIntervalFileNode.checkEmpty()) {
-      emptyIntervalFileNode.clear();
-    } else {
-      if (!newFileNodes.isEmpty()) {
-        IntervalFileNode first = newFileNodes.get(0);
-        for (String deviceId : 
emptyIntervalFileNode.getStartTimeMap().keySet()) {
-          first.setStartTime(deviceId, 
emptyIntervalFileNode.getStartTime(deviceId));
-          first.setEndTime(deviceId, 
emptyIntervalFileNode.getEndTime(deviceId));
-          first.setOverflowChangeType(OverflowChangeType.CHANGED);
-        }
-        emptyIntervalFileNode.clear();
-      } else {
-        
emptyIntervalFileNode.setOverflowChangeType(OverflowChangeType.CHANGED);
-      }
-    }
-  }
-
-  private List<IntervalFileNode> switchFileNodeToMerge() throws 
FileNodeProcessorException {
-    List<IntervalFileNode> result = new ArrayList<>();
-    if (emptyIntervalFileNode.getOverflowChangeType() != 
OverflowChangeType.NO_CHANGE) {
-      // add empty
-      result.add(emptyIntervalFileNode.backUp());
-      if (!newFileNodes.isEmpty()) {
-        throw new FileNodeProcessorException(
-            String.format("The status of empty file is %s, but the new file 
list is not empty",
-                emptyIntervalFileNode.getOverflowChangeType()));
-      }
-      return result;
-    }
-    if (newFileNodes.isEmpty()) {
-      LOGGER.error("No file was changed when merging, the filenode is {}", 
getProcessorName());
-      throw new FileNodeProcessorException(
-          "No file was changed when merging, the filenode is " + 
getProcessorName());
-    }
-    for (IntervalFileNode intervalFileNode : newFileNodes) {
-      updateFileNode(intervalFileNode, result);
-    }
-    return result;
-  }
-
-  private void updateFileNode(IntervalFileNode intervalFileNode, 
List<IntervalFileNode> result) {
-    if (intervalFileNode.getOverflowChangeType() == 
OverflowChangeType.NO_CHANGE) {
-      result.add(intervalFileNode.backUp());
-    } else {
-      Map<String, Long> startTimeMap = new HashMap<>();
-      Map<String, Long> endTimeMap = new HashMap<>();
-      for (String deviceId : intervalFileNode.getEndTimeMap().keySet()) {
-        List<IntervalFileNode> temp = invertedIndexOfFiles.get(deviceId);
-        int index = temp.indexOf(intervalFileNode);
-        int size = temp.size();
-        // start time
-        if (index == 0) {
-          startTimeMap.put(deviceId, 0L);
-        } else {
-          startTimeMap.put(deviceId, intervalFileNode.getStartTime(deviceId));
-        }
-        // end time
-        if (index < size - 1) {
-          endTimeMap.put(deviceId, temp.get(index + 1).getStartTime(deviceId) 
- 1);
-        } else {
-          endTimeMap.put(deviceId, intervalFileNode.getEndTime(deviceId));
-        }
-      }
-      IntervalFileNode node = new IntervalFileNode(startTimeMap, endTimeMap,
-          intervalFileNode.getOverflowChangeType(), 
intervalFileNode.getBaseDirIndex(),
-          intervalFileNode.getRelativePath());
-      result.add(node);
-    }
-  }
-
-  private void switchMergeToWaiting(List<IntervalFileNode> 
backupIntervalFiles, boolean needEmpty)
-      throws FileNodeProcessorException {
-    LOGGER.info("The status of filenode processor {} switches from {} to {}.", 
getProcessorName(),
-        FileNodeProcessorStatus.MERGING_WRITE, 
FileNodeProcessorStatus.WAITING);
-    writeLock();
-    try {
-      oldMultiPassTokenSet = newMultiPassTokenSet;
-      oldMultiPassLock = newMultiPassLock;
-      newMultiPassTokenSet = new HashSet<>();
-      newMultiPassLock = new ReentrantReadWriteLock(false);
-      List<IntervalFileNode> result = new ArrayList<>();
-      int beginIndex = 0;
-      if (needEmpty) {
-        IntervalFileNode empty = backupIntervalFiles.get(0);
-        if (!empty.checkEmpty()) {
-          updateEmpty(empty, result);
-          beginIndex++;
-        }
-      }
-      // reconstruct the file index
-      addAllFileIntoIndex(backupIntervalFiles);
-      // check the merge changed file
-      for (int i = beginIndex; i < backupIntervalFiles.size(); i++) {
-        IntervalFileNode newFile = newFileNodes.get(i - beginIndex);
-        IntervalFileNode temp = backupIntervalFiles.get(i);
-        if (newFile.getOverflowChangeType() == 
OverflowChangeType.MERGING_CHANGE) {
-          updateMergeChanged(newFile, temp);
-        }
-        if (!temp.checkEmpty()) {
-          result.add(temp);
-        }
-      }
-      // add new file when merge
-      for (int i = backupIntervalFiles.size() - beginIndex; i < 
newFileNodes.size(); i++) {
-        IntervalFileNode fileNode = newFileNodes.get(i);
-        if (fileNode.isClosed()) {
-          result.add(fileNode.backUp());
-        } else {
-          result.add(fileNode);
-        }
-      }
-
-      isMerging = FileNodeProcessorStatus.WAITING;
-      newFileNodes = result;
-      // reconstruct the index
-      addAllFileIntoIndex(newFileNodes);
-      // clear merge changed
-      for (IntervalFileNode fileNode : newFileNodes) {
-        fileNode.clearMergeChanged();
-      }
-
-      synchronized (fileNodeProcessorStore) {
-        fileNodeProcessorStore.setFileNodeProcessorStatus(isMerging);
-        fileNodeProcessorStore.setEmptyIntervalFileNode(emptyIntervalFileNode);
-        fileNodeProcessorStore.setNewFileNodes(newFileNodes);
-        try {
-          writeStoreToDisk(fileNodeProcessorStore);
-        } catch (FileNodeProcessorException e) {
-          LOGGER.error(
-              "Merge: failed to write filenode information to revocery file, 
the filenode is " +
-                  "{}.",
-              getProcessorName(), e);
-          throw new FileNodeProcessorException(
-              "Merge: write filenode information to revocery file failed, the 
filenode is "
-                  + getProcessorName());
-        }
-      }
-    } finally {
-      writeUnlock();
-    }
-  }
 
-  private void updateEmpty(IntervalFileNode empty, List<IntervalFileNode> 
result) {
-    for (String deviceId : empty.getStartTimeMap().keySet()) {
-      if (invertedIndexOfFiles.containsKey(deviceId)) {
-        IntervalFileNode temp = invertedIndexOfFiles.get(deviceId).get(0);
-        if (temp.getMergeChanged().contains(deviceId)) {
-          empty.setOverflowChangeType(OverflowChangeType.CHANGED);
-          break;
-        }
-      }
-    }
-    empty.clearMergeChanged();
-    result.add(empty.backUp());
   }
 
-  private void updateMergeChanged(IntervalFileNode newFile, IntervalFileNode 
temp) {
-    for (String deviceId : newFile.getMergeChanged()) {
-      if (temp.getStartTimeMap().containsKey(deviceId)) {
-        temp.setOverflowChangeType(OverflowChangeType.CHANGED);
-      } else {
-        changeTypeToChanged(deviceId, newFile.getStartTime(deviceId),
-            newFile.getEndTime(deviceId));
-      }
-    }
-  }
-
-
-  private void switchWaitingToWorking()
-      throws FileNodeProcessorException {
-
-    LOGGER.info("The status of filenode processor {} switches from {} to {}.", 
getProcessorName(),
-        FileNodeProcessorStatus.WAITING, FileNodeProcessorStatus.NONE);
-
-    if (oldMultiPassLock != null) {
-      LOGGER.info("The old Multiple Pass Token set is {}, the old Multiple 
Pass Lock is {}",
-          oldMultiPassTokenSet,
-          oldMultiPassLock);
-      oldMultiPassLock.writeLock().lock();
-    }
-    try {
-      writeLock();
-      try {
-        // delete the all files which are in the newFileNodes
-        // notice: the last restore file of the interval file
-
-        List<String> bufferwriteDirPathList = 
directories.getAllTsFileFolders();
-        List<File> bufferwriteDirList = new ArrayList<>();
-        collectBufferWriteDirs(bufferwriteDirPathList, bufferwriteDirList);
-
-        Set<String> bufferFiles = new HashSet<>();
-        collectBufferWriteFiles(bufferFiles);
-
-        // add the restore file, if the last file is not closed
-        if (!newFileNodes.isEmpty() && !newFileNodes.get(newFileNodes.size() - 
1).isClosed()) {
-          String bufferFileRestorePath =
-              newFileNodes.get(newFileNodes.size() - 1).getFilePath() + 
RESTORE_FILE_SUFFIX;
-          bufferFiles.add(bufferFileRestorePath);
-        }
-
-        deleteBufferWriteFiles(bufferwriteDirList, bufferFiles);
-
-        // merge switch
-        changeFileNodes();
-
-        // overflow switch from merge to work
-        overflowProcessor.switchMergeToWork();
-        // write status to file
-        isMerging = FileNodeProcessorStatus.NONE;
-        synchronized (fileNodeProcessorStore) {
-          fileNodeProcessorStore.setFileNodeProcessorStatus(isMerging);
-          fileNodeProcessorStore.setNewFileNodes(newFileNodes);
-          
fileNodeProcessorStore.setEmptyIntervalFileNode(emptyIntervalFileNode);
-          writeStoreToDisk(fileNodeProcessorStore);
-        }
-      } catch (IOException e) {
-        LOGGER.info(
-            "The filenode processor {} encountered an error when its "
-                + "status switched from {} to {}.",
-            getProcessorName(), FileNodeProcessorStatus.NONE,
-            FileNodeProcessorStatus.MERGING_WRITE);
-        throw new FileNodeProcessorException(e);
-      } finally {
-        writeUnlock();
-      }
-    } finally {
-      oldMultiPassTokenSet = null;
-      if (oldMultiPassLock != null) {
-        oldMultiPassLock.writeLock().unlock();
-      }
-      oldMultiPassLock = null;
-    }
-
-  }
-
-  private void collectBufferWriteDirs(List<String> bufferwriteDirPathList,
-      List<File> bufferwriteDirList) {
-    for (String bufferwriteDirPath : bufferwriteDirPathList) {
-      if (bufferwriteDirPath.length() > 0
-          && bufferwriteDirPath.charAt(bufferwriteDirPath.length() - 1)
-          != File.separatorChar) {
-        bufferwriteDirPath = bufferwriteDirPath + File.separatorChar;
-      }
-      bufferwriteDirPath = bufferwriteDirPath + getProcessorName();
-      File bufferwriteDir = new File(bufferwriteDirPath);
-      bufferwriteDirList.add(bufferwriteDir);
-      if (!bufferwriteDir.exists()) {
-        bufferwriteDir.mkdirs();
-      }
-    }
-  }
-
-  private void collectBufferWriteFiles(Set<String> bufferFiles) {
-    for (IntervalFileNode bufferFileNode : newFileNodes) {
-      String bufferFilePath = bufferFileNode.getFilePath();
-      if (bufferFilePath != null) {
-        bufferFiles.add(bufferFilePath);
-      }
-    }
-  }
-
-  private void deleteBufferWriteFiles(List<File> bufferwriteDirList, 
Set<String> bufferFiles) {
-    for (File bufferwriteDir : bufferwriteDirList) {
-      File[] files = bufferwriteDir.listFiles();
-      if (files == null) {
-        continue;
-      }
-      for (File file : files) {
-        if (!bufferFiles.contains(file.getPath()) && !file.delete()) {
-          LOGGER.warn("Cannot delete BufferWrite file {}", file.getPath());
-        }
-      }
-    }
-  }
-
-  private void changeFileNodes() {
-    for (IntervalFileNode fileNode : newFileNodes) {
-      if (fileNode.getOverflowChangeType() != OverflowChangeType.NO_CHANGE) {
-        fileNode.setOverflowChangeType(OverflowChangeType.CHANGED);
-      }
-    }
-  }
-
-  private String queryAndWriteDataForMerge(IntervalFileNode backupIntervalFile)
-      throws IOException, FileNodeProcessorException, PathErrorException {
-    Map<String, Long> startTimeMap = new HashMap<>();
-    Map<String, Long> endTimeMap = new HashMap<>();
-
-    mergeFileWriter = null;
-    mergeOutputPath = null;
-    mergeBaseDir = null;
-    mergeFileName = null;
-    // modifications are blocked before mergeModification is created to avoid
-    // losing some modification.
-    mergeDeleteLock.lock();
-    QueryContext context = new QueryContext();
-    try {
-      for (String deviceId : backupIntervalFile.getStartTimeMap().keySet()) {
-        // query one deviceId
-        List<Path> pathList = new ArrayList<>();
-        mergeIsChunkGroupHasData = false;
-        mergeStartPos = -1;
-        ChunkGroupFooter footer;
-        int numOfChunk = 0;
-        try {
-          List<String> pathStrings = 
mManager.getLeafNodePathInNextLevel(deviceId);
-          for (String string : pathStrings) {
-            pathList.add(new Path(string));
-          }
-        } catch (PathErrorException e) {
-          LOGGER.error("Can't get all the paths from MManager, the deviceId is 
{}", deviceId);
-          throw new FileNodeProcessorException(e);
-        }
-        if (pathList.isEmpty()) {
-          continue;
-        }
-        for (Path path : pathList) {
-          // query one measurement in the special deviceId
-          String measurementId = path.getMeasurement();
-          TSDataType dataType = mManager.getSeriesType(path.getFullPath());
-          OverflowSeriesDataSource overflowSeriesDataSource = 
overflowProcessor.queryMerge(deviceId,
-              measurementId, dataType, true, context);
-          Filter timeFilter = FilterFactory
-              .and(TimeFilter.gtEq(backupIntervalFile.getStartTime(deviceId)),
-                  TimeFilter.ltEq(backupIntervalFile.getEndTime(deviceId)));
-          SingleSeriesExpression seriesFilter = new 
SingleSeriesExpression(path, timeFilter);
-          IReader seriesReader = SeriesReaderFactory.getInstance()
-              .createSeriesReaderForMerge(backupIntervalFile,
-                  overflowSeriesDataSource, seriesFilter, context);
-          numOfChunk += queryAndWriteSeries(seriesReader, path, seriesFilter, 
dataType,
-              startTimeMap, endTimeMap);
-        }
-        if (mergeIsChunkGroupHasData) {
-          // end the new rowGroupMetadata
-          long size = mergeFileWriter.getPos() - mergeStartPos;
-          footer = new ChunkGroupFooter(deviceId, size, numOfChunk);
-          mergeFileWriter.endChunkGroup(footer, 0);
-        }
-      }
-    } finally {
-      if (mergeDeleteLock.isLocked()) {
-        mergeDeleteLock.unlock();
-      }
-    }
-
-    if (mergeFileWriter != null) {
-      mergeFileWriter.endFile(fileSchema);
-    }
-    
backupIntervalFile.setBaseDirIndex(directories.getTsFileFolderIndex(mergeBaseDir));
-    backupIntervalFile.setRelativePath(mergeFileName);
-    backupIntervalFile.setOverflowChangeType(OverflowChangeType.NO_CHANGE);
-    backupIntervalFile.setStartTimeMap(startTimeMap);
-    backupIntervalFile.setEndTimeMap(endTimeMap);
-    backupIntervalFile.setModFile(mergingModification);
-    mergingModification = null;
-    return mergeFileName;
-  }
-
-  private int queryAndWriteSeries(IReader seriesReader, Path path,
-      SingleSeriesExpression seriesFilter, TSDataType dataType,
-      Map<String, Long> startTimeMap, Map<String, Long> endTimeMap)
-      throws IOException {
-    int numOfChunk = 0;
-    try {
-      if (!seriesReader.hasNext()) {
-        LOGGER.debug(
-            "The time-series {} has no data with the filter {} in the filenode 
processor {}",
-            path, seriesFilter, getProcessorName());
-      } else {
-        numOfChunk++;
-        TimeValuePair timeValuePair = seriesReader.next();
-        if (mergeFileWriter == null) {
-          mergeBaseDir = directories.getNextFolderForTsfile();
-          mergeFileName = timeValuePair.getTimestamp()
-              + FileNodeConstants.BUFFERWRITE_FILE_SEPARATOR + 
System.currentTimeMillis();
-          mergeOutputPath = constructOutputFilePath(mergeBaseDir, 
getProcessorName(),
-              mergeFileName);
-          mergeFileName = getProcessorName() + File.separatorChar + 
mergeFileName;
-          mergeFileWriter = new TsFileIOWriter(new File(mergeOutputPath));
-          mergingModification = new ModificationFile(mergeOutputPath
-              + ModificationFile.FILE_SUFFIX);
-          mergeDeleteLock.unlock();
-        }
-        if (!mergeIsChunkGroupHasData) {
-          // start a new rowGroupMetadata
-          mergeIsChunkGroupHasData = true;
-          // the datasize and numOfChunk is fake
-          // the accurate datasize and numOfChunk will get after write all 
this device data.
-          mergeFileWriter.startFlushChunkGroup(path.getDevice());// TODO 
please check me.
-          mergeStartPos = mergeFileWriter.getPos();
-        }
-        // init the serieswWriteImpl
-        MeasurementSchema measurementSchema = fileSchema
-            .getMeasurementSchema(path.getMeasurement());
-        ChunkBuffer pageWriter = new ChunkBuffer(measurementSchema);
-        int pageSizeThreshold = TSFileConfig.pageSizeInByte;
-        ChunkWriterImpl seriesWriterImpl = new 
ChunkWriterImpl(measurementSchema, pageWriter,
-            pageSizeThreshold);
-        // write the series data
-        writeOneSeries(path.getDevice(), seriesWriterImpl, dataType,
-            seriesReader,
-            startTimeMap, endTimeMap, timeValuePair);
-        // flush the series data
-        seriesWriterImpl.writeToFileWriter(mergeFileWriter);
-      }
-    } finally {
-      seriesReader.close();
-    }
-    return numOfChunk;
-  }
 
 
-  private void writeOneSeries(String deviceId, ChunkWriterImpl 
seriesWriterImpl,
-      TSDataType dataType, IReader seriesReader, Map<String, Long> 
startTimeMap,
-      Map<String, Long> endTimeMap, TimeValuePair firstTVPair) throws 
IOException {
-    long startTime;
-    long endTime;
-    TimeValuePair localTV = firstTVPair;
-    writeTVPair(seriesWriterImpl, dataType, localTV);
-    startTime = endTime = localTV.getTimestamp();
-    if (!startTimeMap.containsKey(deviceId) || startTimeMap.get(deviceId) > 
startTime) {
-      startTimeMap.put(deviceId, startTime);
-    }
-    if (!endTimeMap.containsKey(deviceId) || endTimeMap.get(deviceId) < 
endTime) {
-      endTimeMap.put(deviceId, endTime);
-    }
-    while (seriesReader.hasNext()) {
-      localTV = seriesReader.next();
-      endTime = localTV.getTimestamp();
-      writeTVPair(seriesWriterImpl, dataType, localTV);
-    }
-    if (!endTimeMap.containsKey(deviceId) || endTimeMap.get(deviceId) < 
endTime) {
-      endTimeMap.put(deviceId, endTime);
-    }
-  }
-
-  private void writeTVPair(ChunkWriterImpl seriesWriterImpl, TSDataType 
dataType,
-      TimeValuePair timeValuePair) throws IOException {
-    switch (dataType) {
-      case BOOLEAN:
-        seriesWriterImpl.write(timeValuePair.getTimestamp(), 
timeValuePair.getValue().getBoolean());
-        break;
-      case INT32:
-        seriesWriterImpl.write(timeValuePair.getTimestamp(), 
timeValuePair.getValue().getInt());
-        break;
-      case INT64:
-        seriesWriterImpl.write(timeValuePair.getTimestamp(), 
timeValuePair.getValue().getLong());
-        break;
-      case FLOAT:
-        seriesWriterImpl.write(timeValuePair.getTimestamp(), 
timeValuePair.getValue().getFloat());
-        break;
-      case DOUBLE:
-        seriesWriterImpl.write(timeValuePair.getTimestamp(), 
timeValuePair.getValue().getDouble());
-        break;
-      case TEXT:
-        seriesWriterImpl.write(timeValuePair.getTimestamp(), 
timeValuePair.getValue().getBinary());
-        break;
-      default:
-        LOGGER.error("Not support data type: {}", dataType);
-        break;
-    }
-  }
-
-
-  private String constructOutputFilePath(String baseDir, String processorName, 
String fileName) {
-
-    String localBaseDir = baseDir;
-    if (localBaseDir.charAt(localBaseDir.length() - 1) != File.separatorChar) {
-      localBaseDir = localBaseDir + File.separatorChar + processorName;
-    }
-    File dataDir = new File(localBaseDir);
-    if (!dataDir.exists()) {
-      LOGGER.warn("The bufferwrite processor data dir doesn't exists, create 
new directory {}",
-          localBaseDir);
-      dataDir.mkdirs();
-    }
-    File outputFile = new File(dataDir, fileName);
-    return outputFile.getPath();
-  }
-
-  private FileSchema constructFileSchema(String processorName) throws 
WriteProcessException {
-
-    List<MeasurementSchema> columnSchemaList;
-    columnSchemaList = mManager.getSchemaForFileName(processorName);
-
-    FileSchema schema = new FileSchema();
-    for (MeasurementSchema measurementSchema : columnSchemaList) {
-      schema.registerMeasurement(measurementSchema);
-    }
-    return schema;
-
-  }
-
   @Override
   public boolean canBeClosed() {
-    if (isMerging != FileNodeProcessorStatus.NONE) {
-      LOGGER.info("The filenode {} can't be closed, because the filenode 
status is {}",
-          getProcessorName(),
-          isMerging);
-      return false;
-    }
-    if (!newMultiPassLock.writeLock().tryLock()) {
-      LOGGER.info("The filenode {} can't be closed, because it can't get 
newMultiPassLock {}",
-          getProcessorName(), newMultiPassLock);
-      return false;
-    }
-
-    try {
-      if (oldMultiPassLock == null) {
-        return true;
-      }
-      if (oldMultiPassLock.writeLock().tryLock()) {
-        try {
-          return true;
-        } finally {
-          oldMultiPassLock.writeLock().unlock();
-        }
-      } else {
-        LOGGER.info("The filenode {} can't be closed, because it can't get"
-                + " oldMultiPassLock {}",
-            getProcessorName(), oldMultiPassLock);
-        return false;
-      }
-    } finally {
-      newMultiPassLock.writeLock().unlock();
-    }
+
   }
 
   @Override
   public FileNodeFlushFuture flush() throws IOException {
-    Future<Boolean> bufferWriteFlushFuture = null;
-    Future<Boolean> overflowFlushFuture = null;
-    if (bufferWriteProcessor != null) {
-      bufferWriteFlushFuture = bufferWriteProcessor.flush();
-    }
-    if (overflowProcessor != null) {
-      overflowFlushFuture = overflowProcessor.flush();
-    }
-    return new FileNodeFlushFuture(bufferWriteFlushFuture, 
overflowFlushFuture);
-  }
 
-  /**
-   * Close the bufferwrite processor.
-   */
-  public void closeBufferWrite() throws FileNodeProcessorException {
-    if (bufferWriteProcessor == null) {
-      return;
-    }
-    try {
-      while (!bufferWriteProcessor.canBeClosed()) {
-        waitForBufferWriteClose();
-      }
-      bufferWriteProcessor.close();
-      bufferWriteProcessor = null;
-    } catch (BufferWriteProcessorException e) {
-      throw new FileNodeProcessorException(e);
-    }
   }
 
-  private void waitForBufferWriteClose() {
-    try {
-      LOGGER.info("The bufferwrite {} can't be closed, wait 100ms",
-          bufferWriteProcessor.getProcessorName());
-      TimeUnit.MICROSECONDS.sleep(100);
-    } catch (InterruptedException e) {
-      LOGGER.error("Unexpected interruption", e);
-      Thread.currentThread().interrupt();
-    }
-  }
+
 
   /**
    * Close the overflow processor.
    */
   public void closeOverflow() throws FileNodeProcessorException {
-    if (overflowProcessor == null) {
-      return;
-    }
-    try {
-      while (!overflowProcessor.canBeClosed()) {
-        waitForOverflowClose();
-      }
-      overflowProcessor.close();
-      overflowProcessor.clear();
-      overflowProcessor = null;
-    } catch (OverflowProcessorException | IOException e) {
-      throw new FileNodeProcessorException(e);
-    }
-  }
 
-  private void waitForOverflowClose() {
-    try {
-      LOGGER.info("The overflow {} can't be closed, wait 100ms",
-          overflowProcessor.getProcessorName());
-      TimeUnit.MICROSECONDS.sleep(100);
-    } catch (InterruptedException e) {
-      LOGGER.error("Unexpected interruption", e);
-      Thread.currentThread().interrupt();
-    }
   }
 
+
+
   @Override
   public void close() throws FileNodeProcessorException {
-    closeBufferWrite();
-    closeOverflow();
-    for (IntervalFileNode fileNode : newFileNodes) {
-      if (fileNode.getModFile() != null) {
-        try {
-          fileNode.getModFile().close();
-        } catch (IOException e) {
-          throw new FileNodeProcessorException(e);
-        }
-      }
-    }
+
   }
 
   /**
    * deregister the filenode processor.
    */
   public void delete() throws ProcessorException {
-    if (TsFileDBConf.enableStatMonitor) {
-      // remove the monitor
-      LOGGER.info("Deregister the filenode processor: {} from monitor.", 
getProcessorName());
-      StatMonitor.getInstance().deregistStatistics(statStorageDeltaName);
-    }
-    closeBufferWrite();
-    closeOverflow();
-    for (IntervalFileNode fileNode : newFileNodes) {
-      if (fileNode.getModFile() != null) {
-        try {
-          fileNode.getModFile().close();
-        } catch (IOException e) {
-          throw new FileNodeProcessorException(e);
-        }
-      }
-    }
+
   }
 
   @Override
   public long memoryUsage() {
-    long memSize = 0;
-    if (bufferWriteProcessor != null) {
-      memSize += bufferWriteProcessor.memoryUsage();
-    }
-    if (overflowProcessor != null) {
-      memSize += overflowProcessor.memoryUsage();
-    }
-    return memSize;
-  }
-
-  private void writeStoreToDisk(FileNodeProcessorStore fileNodeProcessorStore)
-      throws FileNodeProcessorException {
 
-    synchronized (fileNodeRestoreLock) {
-      SerializeUtil<FileNodeProcessorStore> serializeUtil = new 
SerializeUtil<>();
-      try {
-        serializeUtil.serialize(fileNodeProcessorStore, 
fileNodeRestoreFilePath);
-        LOGGER.debug("The filenode processor {} writes restore information to 
the restore file",
-            getProcessorName());
-      } catch (IOException e) {
-        throw new FileNodeProcessorException(e);
-      }
-    }
   }
 
-  private FileNodeProcessorStore readStoreFromDisk() throws 
FileNodeProcessorException {
-
-    synchronized (fileNodeRestoreLock) {
-      FileNodeProcessorStore processorStore;
-      SerializeUtil<FileNodeProcessorStore> serializeUtil = new 
SerializeUtil<>();
-      try {
-        processorStore = serializeUtil.deserialize(fileNodeRestoreFilePath)
-            .orElse(new FileNodeProcessorStore(false, new HashMap<>(),
-                new IntervalFileNode(OverflowChangeType.NO_CHANGE, null),
-                new ArrayList<>(), FileNodeProcessorStatus.NONE, 0));
-      } catch (IOException e) {
-        throw new FileNodeProcessorException(e);
-      }
-      return processorStore;
-    }
-  }
-
-  String getFileNodeRestoreFilePath() {
-    return fileNodeRestoreFilePath;
-  }
 
   /**
    * Delete data whose timestamp <= 'timestamp' and belong to timeseries 
deviceId.measurementId.
@@ -1848,76 +310,17 @@ public class FileNodeProcessor2 extends Processor 
implements IStatistic {
    * @param timestamp the delete range is (0, timestamp].
    */
   public void delete(String deviceId, String measurementId, long timestamp) 
throws IOException {
-    // TODO: how to avoid partial deletion?
-    mergeDeleteLock.lock();
-    long version = versionController.nextVersion();
-
-    // record what files are updated so we can roll back them in case of 
exception
-    List<ModificationFile> updatedModFiles = new ArrayList<>();
-
-    try {
-      String fullPath = deviceId +
-          IoTDBConstant.PATH_SEPARATOR + measurementId;
-      Deletion deletion = new Deletion(fullPath, version, timestamp);
-      if (mergingModification != null) {
-        mergingModification.write(deletion);
-        updatedModFiles.add(mergingModification);
-      }
-      deleteBufferWriteFiles(deviceId, deletion, updatedModFiles);
-      // delete data in memory
-      OverflowProcessor ofProcessor = getOverflowProcessor(getProcessorName());
-      ofProcessor.delete(deviceId, measurementId, timestamp, version, 
updatedModFiles);
-      if (bufferWriteProcessor != null) {
-        bufferWriteProcessor.delete(deviceId, measurementId, timestamp);
-      }
-    } catch (Exception e) {
-      // roll back
-      for (ModificationFile modFile : updatedModFiles) {
-        modFile.abort();
-      }
-      throw new IOException(e);
-    } finally {
-      mergeDeleteLock.unlock();
-    }
-  }
 
-  private void deleteBufferWriteFiles(String deviceId, Deletion deletion,
-      List<ModificationFile> updatedModFiles) throws IOException {
-    if (currentIntervalFileNode != null && 
currentIntervalFileNode.containsDevice(deviceId)) {
-      currentIntervalFileNode.getModFile().write(deletion);
-      updatedModFiles.add(currentIntervalFileNode.getModFile());
-    }
-    for (IntervalFileNode fileNode : newFileNodes) {
-      if (fileNode != currentIntervalFileNode && 
fileNode.containsDevice(deviceId)
-          && fileNode.getStartTime(deviceId) <= deletion.getTimestamp()) {
-        fileNode.getModFile().write(deletion);
-        updatedModFiles.add(fileNode.getModFile());
-      }
-    }
   }
 
+
+
   /**
    * Similar to delete(), but only deletes data in BufferWrite. Only used by 
WAL recovery.
    */
   public void deleteBufferWrite(String deviceId, String measurementId, long 
timestamp)
       throws IOException {
-    String fullPath = deviceId +
-        IoTDBConstant.PATH_SEPARATOR + measurementId;
-    long version = versionController.nextVersion();
-    Deletion deletion = new Deletion(fullPath, version, timestamp);
-
-    List<ModificationFile> updatedModFiles = new ArrayList<>();
-    try {
-      deleteBufferWriteFiles(deviceId, deletion, updatedModFiles);
-    } catch (IOException e) {
-      for (ModificationFile modificationFile : updatedModFiles) {
-        modificationFile.abort();
-      }
-      throw e;
-    }
-    if (bufferWriteProcessor != null) {
-      bufferWriteProcessor.delete(deviceId, measurementId, timestamp);
-    }
+
   }
 
   /**
@@ -1925,97 +328,10 @@ public class FileNodeProcessor2 extends Processor 
implements IStatistic {
    */
   public void deleteOverflow(String deviceId, String measurementId, long 
timestamp)
       throws IOException {
-    long version = versionController.nextVersion();
-
-    OverflowProcessor overflowProcessor = 
getOverflowProcessor(getProcessorName());
-    List<ModificationFile> updatedModFiles = new ArrayList<>();
-    try {
-      overflowProcessor.delete(deviceId, measurementId, timestamp, version, 
updatedModFiles);
-    } catch (IOException e) {
-      for (ModificationFile modificationFile : updatedModFiles) {
-        modificationFile.abort();
-      }
-      throw e;
-    }
-  }
 
-  @Override
-  public boolean equals(Object o) {
-    if (this == o) {
-      return true;
-    }
-    if (o == null || getClass() != o.getClass()) {
-      return false;
-    }
-    if (!super.equals(o)) {
-      return false;
-    }
-    FileNodeProcessor2 that = (FileNodeProcessor2) o;
-    return isOverflowed == that.isOverflowed &&
-        numOfMergeFile == that.numOfMergeFile &&
-        lastMergeTime == that.lastMergeTime &&
-        shouldRecovery == that.shouldRecovery &&
-        multiPassLockToken == that.multiPassLockToken &&
-        Objects.equals(statStorageDeltaName, that.statStorageDeltaName) &&
-        Objects.equals(statParamsHashMap, that.statParamsHashMap) &&
-        Objects.equals(lastUpdateTimeMap, that.lastUpdateTimeMap) &&
-        Objects.equals(flushLastUpdateTimeMap, that.flushLastUpdateTimeMap) &&
-        Objects.equals(invertedIndexOfFiles, that.invertedIndexOfFiles) &&
-        Objects.equals(emptyIntervalFileNode, that.emptyIntervalFileNode) &&
-        Objects.equals(currentIntervalFileNode, that.currentIntervalFileNode) 
&&
-        Objects.equals(newFileNodes, that.newFileNodes) &&
-        isMerging == that.isMerging &&
-        Objects.equals(fileNodeProcessorStore, that.fileNodeProcessorStore) &&
-        Objects.equals(fileNodeRestoreFilePath, that.fileNodeRestoreFilePath) 
&&
-        Objects.equals(baseDirPath, that.baseDirPath) &&
-        Objects.equals(bufferWriteProcessor, that.bufferWriteProcessor) &&
-        Objects.equals(overflowProcessor, that.overflowProcessor) &&
-        Objects.equals(oldMultiPassTokenSet, that.oldMultiPassTokenSet) &&
-        Objects.equals(newMultiPassTokenSet, that.newMultiPassTokenSet) &&
-        Objects.equals(oldMultiPassLock, that.oldMultiPassLock) &&
-        Objects.equals(newMultiPassLock, that.newMultiPassLock) &&
-        Objects.equals(parameters, that.parameters) &&
-        Objects.equals(fileSchema, that.fileSchema) &&
-        Objects.equals(flushFileNodeProcessorAction, 
that.flushFileNodeProcessorAction) &&
-        Objects.equals(bufferwriteFlushAction, that.bufferwriteFlushAction) &&
-        Objects.equals(bufferwriteCloseAction, that.bufferwriteCloseAction) &&
-        Objects.equals(overflowFlushAction, that.overflowFlushAction);
   }
 
-  @Override
-  public int hashCode() {
-    return Objects.hash(super.hashCode(), statStorageDeltaName, 
statParamsHashMap, isOverflowed,
-        lastUpdateTimeMap, flushLastUpdateTimeMap, invertedIndexOfFiles,
-        emptyIntervalFileNode, currentIntervalFileNode, newFileNodes, 
isMerging,
-        numOfMergeFile, fileNodeProcessorStore, fileNodeRestoreFilePath, 
baseDirPath,
-        lastMergeTime, bufferWriteProcessor, overflowProcessor, 
oldMultiPassTokenSet,
-        newMultiPassTokenSet, oldMultiPassLock, newMultiPassLock, 
shouldRecovery, parameters,
-        fileSchema, flushFileNodeProcessorAction, bufferwriteFlushAction,
-        bufferwriteCloseAction, overflowFlushAction, multiPassLockToken);
-  }
 
-  public class MergeRunnale implements Runnable {
-
-    @Override
-    public void run() {
-      try {
-        ZoneId zoneId = IoTDBDescriptor.getInstance().getConfig().getZoneID();
-        long mergeStartTime = System.currentTimeMillis();
-        writeLock();
-        merge();
-        long mergeEndTime = System.currentTimeMillis();
-        long intervalTime = mergeEndTime - mergeStartTime;
-        LOGGER.info(
-            "The filenode processor {} merge start time is {}, "
-                + "merge end time is {}, merge consumes {}ms.",
-            getProcessorName(), ofInstant(Instant.ofEpochMilli(mergeStartTime),
-                zoneId), ofInstant(Instant.ofEpochMilli(mergeEndTime),
-                zoneId), intervalTime);
-      } catch (FileNodeProcessorException e) {
-        LOGGER.error("The filenode processor {} encountered an error when 
merging.",
-            getProcessorName(), e);
-        throw new ErrorDebugException(e);
-      }
-    }
-  }
+
+
 }
\ No newline at end of file
diff --git 
a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/OverflowChangeType.java
 
b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/OverflowChangeType.java
index 5bbf034..925b2fc 100644
--- 
a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/OverflowChangeType.java
+++ 
b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/OverflowChangeType.java
@@ -24,7 +24,17 @@ package org.apache.iotdb.db.engine.filenode;
  * If it's changed and in CHANGED previously, and in merging, 
CHANGED-->MERGING_CHANGE, update file<br>
  * If it's changed and in CHANGED previously, and not in merging, do 
nothing<br>
  * After merging, if it's MERGING_CHANGE, MERGING_CHANGE-->CHANGED, otherwise 
in NO_CHANGE, MERGING_CHANGE-->NO_CHANGE
+ *<pre>
+ *    +----------+ 2
+ *    v          +
+ * NORMAL+---->CHANGED
+ *         1    + 3
+ *              v
+ *  CHANGED_WHEN_MERGE
+ *</pre>
  */
+
 public enum OverflowChangeType {
   NO_CHANGE, CHANGED, MERGING_CHANGE,
-}
\ No newline at end of file
+}
+
diff --git 
a/iotdb/src/test/java/org/apache/iotdb/db/engine/filenode/FileNodeFlushFutureTest.java
 
b/iotdb/src/test/java/org/apache/iotdb/db/engine/filenode/FileNodeFlushFutureTest.java
new file mode 100644
index 0000000..b5973a9
--- /dev/null
+++ 
b/iotdb/src/test/java/org/apache/iotdb/db/engine/filenode/FileNodeFlushFutureTest.java
@@ -0,0 +1,168 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.engine.filenode;
+
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class FileNodeFlushFutureTest {
+  
+
+    FileNodeFlushFuture future1;
+    FileNodeFlushFuture future2;
+    FileNodeFlushFuture future3;
+    FileNodeFlushFuture future4;
+
+    @Before
+    public void setUp() throws Exception {
+      future1 = new FileNodeFlushFuture(new CanCanceledFuture(), new 
CanCanceledFuture());
+      future2 = new FileNodeFlushFuture(new CanNotCanceledFuture(), new 
CanNotCanceledFuture());
+      future3 = new FileNodeFlushFuture(new CanCanceledFuture(), new 
CanNotCanceledFuture());
+      future4 = new FileNodeFlushFuture(new CanNotCanceledFuture(), new 
CanCanceledFuture());
+    }
+
+    @After
+    public void tearDown() throws Exception {
+      //nothing
+    }
+
+    @Test
+    public void cancel() {
+      Assert.assertTrue(future1.cancel(true));
+      Assert.assertFalse(future2.cancel(true));
+      Assert.assertFalse(future3.cancel(true));
+      Assert.assertFalse(future4.cancel(true));
+    }
+
+    @Test
+    public void isCancelled() {
+      Assert.assertFalse(future1.isCancelled());
+      Assert.assertFalse(future2.isCancelled());
+      Assert.assertFalse(future3.isCancelled());
+      Assert.assertFalse(future4.isCancelled());
+
+      future1.cancel(true);
+      future2.cancel(true);
+      future3.cancel(true);
+      future4.cancel(true);
+
+      Assert.assertTrue(future1.isCancelled());
+      Assert.assertFalse(future2.isCancelled());
+      Assert.assertFalse(future3.isCancelled());
+      Assert.assertFalse(future4.isCancelled());
+    }
+
+    @Test
+    public void isDone() throws ExecutionException, InterruptedException, 
TimeoutException {
+      Assert.assertFalse(future1.isDone());
+      Assert.assertFalse(future2.isDone());
+      Assert.assertFalse(future3.isDone());
+      Assert.assertFalse(future4.isDone());
+
+      future1.get();
+      future2.get();
+      future3.get(2, TimeUnit.MILLISECONDS);
+      future4.get(2, TimeUnit.MILLISECONDS);
+
+      Assert.assertTrue(future1.isDone());
+      Assert.assertTrue(future2.isDone());
+      Assert.assertTrue(future3.isDone());
+      Assert.assertTrue(future4.isDone());
+    }
+
+    @Test
+    public void isHasOverflowFlushTask() {
+      Assert.assertTrue(future1.isHasOverflowFlushTask());
+    }
+
+
+    private class CanCanceledFuture implements Future<Boolean> {
+      boolean cancel = false;
+      boolean done = false;
+      @Override
+      public boolean cancel(boolean mayInterruptIfRunning) {
+        cancel = true;
+        done = true;
+        return true;
+      }
+
+      @Override
+      public boolean isCancelled() {
+        return cancel;
+      }
+
+      @Override
+      public boolean isDone() {
+        if (cancel || done) {
+          return true;
+        }
+        return false;
+      }
+
+      @Override
+      public Boolean get() {
+        done = true;
+        return true;
+      }
+
+      @Override
+      public Boolean get(long timeout, TimeUnit unit) {
+        done =true;
+        return true;
+      }
+    }
+
+    private class CanNotCanceledFuture implements Future<Boolean> {
+      boolean done = false;
+      @Override
+      public boolean cancel(boolean mayInterruptIfRunning) {
+        return false;
+      }
+
+      @Override
+      public boolean isCancelled() {
+        return false;
+      }
+
+      @Override
+      public boolean isDone() {
+        return done;
+      }
+
+      @Override
+      public Boolean get() {
+        done = true;
+        return true;
+      }
+
+      @Override
+      public Boolean get(long timeout, TimeUnit unit) {
+        done =true;
+        return true;
+      }
+    }
+  
+}

Reply via email to