This is an automated email from the ASF dual-hosted git repository.

qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new f6f216e  Integrate data file version recording with time partitioning 
(#935)
f6f216e is described below

commit f6f216e888459dedf392cbe1d7754f097e11db91
Author: Jiang Tian <jt2594...@163.com>
AuthorDate: Wed Mar 25 03:35:21 2020 -0500

    Integrate data file version recording with time partitioning (#935)
    
    * integrate data partition with file version management
---
 .../org/apache/iotdb/db/engine/StorageEngine.java  |  12 +-
 .../engine/storagegroup/StorageGroupProcessor.java | 394 ++++++++++++++-------
 .../iotdb/db/exception/LoadEmptyFileException.java |  29 ++
 .../iotdb/db/exception/LoadFileException.java      |  33 ++
 .../db/exception/PartitionViolationException.java  |  29 ++
 .../iotdb/db/sync/receiver/load/FileLoader.java    |   4 +-
 .../integration/IoTDBLoadExternalTsfileTest.java   |  27 +-
 .../java/org/apache/iotdb/rpc/TSStatusCode.java    |   1 +
 8 files changed, 380 insertions(+), 149 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java 
b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
index c63f728..c2da578 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
@@ -48,8 +48,8 @@ import 
org.apache.iotdb.db.engine.querycontext.QueryDataSource;
 import org.apache.iotdb.db.engine.storagegroup.StorageGroupProcessor;
 import org.apache.iotdb.db.engine.storagegroup.TsFileProcessor;
 import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.db.exception.LoadFileException;
 import org.apache.iotdb.db.exception.StorageEngineException;
-import org.apache.iotdb.db.exception.TsFileProcessorException;
 import org.apache.iotdb.db.exception.metadata.MetadataException;
 import org.apache.iotdb.db.exception.metadata.StorageGroupNotSetException;
 import org.apache.iotdb.db.exception.runtime.StorageEngineFailureException;
@@ -481,13 +481,13 @@ public class StorageEngine implements IService {
   }
 
   public void loadNewTsFileForSync(TsFileResource newTsFileResource)
-      throws TsFileProcessorException, StorageEngineException {
+      throws StorageEngineException, LoadFileException {
     getProcessor(newTsFileResource.getFile().getParentFile().getName())
         .loadNewTsFileForSync(newTsFileResource);
   }
 
   public void loadNewTsFile(TsFileResource newTsFileResource)
-      throws TsFileProcessorException, StorageEngineException, 
MetadataException {
+      throws LoadFileException, StorageEngineException, MetadataException {
     Map<String, Long> startTimeMap = newTsFileResource.getStartTimeMap();
     if (startTimeMap == null || startTimeMap.isEmpty()) {
       throw new StorageEngineException("Can not get the corresponding storage 
group.");
@@ -549,10 +549,10 @@ public class StorageEngine implements IService {
     this.fileFlushPolicy = fileFlushPolicy;
   }
 
-  public boolean isFileAlreadyExist(TsFileResource tsFileResource, String 
storageGroup) {
-    // TODO-Cluster#350: integrate with time partitioning
+  public boolean isFileAlreadyExist(TsFileResource tsFileResource, String 
storageGroup,
+      long partitionNum) {
     StorageGroupProcessor processor = processorMap.get(storageGroup);
-    return processor != null && processor.isFileAlreadyExist(tsFileResource);
+    return processor != null && processor.isFileAlreadyExist(tsFileResource, 
partitionNum);
   }
 
   public static long getTimePartitionInterval() {
diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index 295a074..d6b2c98 100755
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -18,6 +18,28 @@
  */
 package org.apache.iotdb.db.engine.storagegroup;
 
+import static org.apache.iotdb.db.engine.merge.task.MergeTask.MERGE_SUFFIX;
+import static 
org.apache.iotdb.db.engine.storagegroup.TsFileResource.TEMP_SUFFIX;
+import static 
org.apache.iotdb.tsfile.common.constant.TsFileConstant.TSFILE_SUFFIX;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.TreeSet;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 import org.apache.commons.io.FileUtils;
 import org.apache.iotdb.db.conf.IoTDBConstant;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
@@ -40,7 +62,14 @@ import 
org.apache.iotdb.db.engine.querycontext.QueryDataSource;
 import org.apache.iotdb.db.engine.querycontext.ReadOnlyMemChunk;
 import org.apache.iotdb.db.engine.version.SimpleFileVersionController;
 import org.apache.iotdb.db.engine.version.VersionController;
-import org.apache.iotdb.db.exception.*;
+import org.apache.iotdb.db.exception.DiskSpaceInsufficientException;
+import org.apache.iotdb.db.exception.LoadEmptyFileException;
+import org.apache.iotdb.db.exception.LoadFileException;
+import org.apache.iotdb.db.exception.MergeException;
+import org.apache.iotdb.db.exception.PartitionViolationException;
+import org.apache.iotdb.db.exception.StorageGroupProcessorException;
+import org.apache.iotdb.db.exception.TsFileProcessorException;
+import org.apache.iotdb.db.exception.WriteProcessException;
 import org.apache.iotdb.db.exception.metadata.MetadataException;
 import org.apache.iotdb.db.exception.query.OutOfTTLException;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
@@ -53,6 +82,7 @@ import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
 import org.apache.iotdb.db.query.context.QueryContext;
 import org.apache.iotdb.db.query.control.QueryFileManager;
 import org.apache.iotdb.db.utils.CopyOnReadLinkedList;
+import org.apache.iotdb.db.utils.FilePathUtils;
 import org.apache.iotdb.db.utils.UpgradeUtils;
 import org.apache.iotdb.db.writelog.recover.TsFileRecoverPerformer;
 import org.apache.iotdb.rpc.RpcUtils;
@@ -73,17 +103,6 @@ import 
org.apache.iotdb.tsfile.write.writer.RestorableTsFileIOWriter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.File;
-import java.io.IOException;
-import java.util.*;
-import java.util.Map.Entry;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
-import static org.apache.iotdb.db.engine.merge.task.MergeTask.MERGE_SUFFIX;
-import static 
org.apache.iotdb.db.engine.storagegroup.TsFileResource.TEMP_SUFFIX;
-import static 
org.apache.iotdb.tsfile.common.constant.TsFileConstant.TSFILE_SUFFIX;
-
 
 /**
  * For sequence data, a StorageGroupProcessor has some TsFileProcessors, in 
which there is only one
@@ -110,6 +129,15 @@ public class StorageGroupProcessor {
   private static final String MERGING_MODIFICATION_FILE_NAME = "merge.mods";
   private static final Logger logger = 
LoggerFactory.getLogger(StorageGroupProcessor.class);
   private static final int MAX_CACHE_SENSORS = 5000;
+
+  /**
+   * indicating the file to be loaded already exists locally.
+   */
+  private static final int POS_ALREADY_EXIST = -2;
+  /**
+   * indicating the file to be loaded overlap with some files.
+   */
+  private static final int POS_OVERLAP = -3;
   /**
    * a read write lock for guaranteeing concurrent safety when accessing all 
fields in this class
    * (i.e., schema, (un)sequenceFileList, work(un)SequenceTsFileProcessor,
@@ -203,9 +231,16 @@ public class StorageGroupProcessor {
   private FSFactory fsFactory = FSFactoryProducer.getFSFactory();
   private TsFileFlushPolicy fileFlushPolicy;
 
-  // allDirectFileVersions records the versions of the direct TsFiles 
(generated by flush), not
-  // including the files generated by merge
-  private Set<Long> allDirectFileVersions = new HashSet<>();
+  /**
+   * partitionDirectFileVersions records the versions of the direct TsFiles 
(generated by close,
+   * not including the files generated by merge) of each partition.
+   * As data file close is managed by the leader in the distributed version, 
the files with the
+   * same version(s) have the same data, despite that the inner structure (the 
size and
+   * organization of chunks) may be different, so we can easily find what 
remote files we do not
+   * have locally.
+   * partition number -> version number set
+   */
+  private Map<Long, Set<Long>> partitionDirectFileVersions = new HashMap<>();
 
   public StorageGroupProcessor(String systemInfoDir, String storageGroupName,
       TsFileFlushPolicy fileFlushPolicy)
@@ -246,14 +281,18 @@ public class StorageGroupProcessor {
         if (resource.getFile().length() == 0) {
           deleteTsfile(resource.getFile());
         }
-        allDirectFileVersions.addAll(resource.getHistoricalVersions());
+        String[] filePathSplit = FilePathUtils.splitTsFilePath(resource);
+        long partitionNum = Long.parseLong(filePathSplit[filePathSplit.length 
- 2]);
+        partitionDirectFileVersions.computeIfAbsent(partitionNum, p -> new 
HashSet<>()).addAll(resource.getHistoricalVersions());
       }
       for (TsFileResource resource : unseqTsFiles) {
         //After recover, case the TsFile's length is equal to 0, delete both 
the TsFileResource and the file itself
         if (resource.getFile().length() == 0) {
           deleteTsfile(resource.getFile());
         }
-        allDirectFileVersions.addAll(resource.getHistoricalVersions());
+        String[] filePathSplit = FilePathUtils.splitTsFilePath(resource);
+        long partitionNum = Long.parseLong(filePathSplit[filePathSplit.length 
- 2]);
+        partitionDirectFileVersions.computeIfAbsent(partitionNum, p -> new 
HashSet<>()).addAll(resource.getHistoricalVersions());
       }
 
       String taskName = storageGroupName + "-" + System.currentTimeMillis();
@@ -307,17 +346,15 @@ public class StorageGroupProcessor {
    * @return version controller
    */
   private VersionController getVersionControllerByTimePartitionId(long 
timePartitionId) {
-    VersionController res = 
timePartitionIdVersionControllerMap.get(timePartitionId);
-    if (res == null) {
-      try {
-        res = new SimpleFileVersionController(storageGroupSysDir.getPath(), 
timePartitionId);
-        timePartitionIdVersionControllerMap.put(timePartitionId, res);
-      } catch (IOException e) {
-        logger.error("can't build a version controller for time partition" + 
timePartitionId);
-      }
-    }
-
-    return res;
+    return timePartitionIdVersionControllerMap.computeIfAbsent(timePartitionId,
+        id -> {
+          try {
+            return new 
SimpleFileVersionController(storageGroupSysDir.getPath(), timePartitionId);
+          } catch (IOException e) {
+            logger.error("can't build a version controller for time partition 
{}", timePartitionId);
+            return null;
+          }
+        });
   }
 
   private List<TsFileResource> getAllFiles(List<String> folders) {
@@ -328,17 +365,20 @@ public class StorageGroupProcessor {
         continue;
       }
 
-      for (File timeRangeFileFolder : fileFolder.listFiles()) {
-        // some TsFileResource may be being persisted when the system crashed, 
try recovering such
-        // resources
-        continueFailedRenames(timeRangeFileFolder, TEMP_SUFFIX);
+      File[] subFiles = fileFolder.listFiles();
+      if (subFiles != null) {
+        for (File partitionFolder : subFiles) {
+          // some TsFileResource may be being persisted when the system 
crashed, try recovering such
+          // resources
+          continueFailedRenames(partitionFolder, TEMP_SUFFIX);
 
-        // some TsFiles were going to be replaced by the merged files when the 
system crashed and
-        // the process was interrupted before the merged files could be named
-        continueFailedRenames(timeRangeFileFolder, MERGE_SUFFIX);
+          // some TsFiles were going to be replaced by the merged files when 
the system crashed and
+          // the process was interrupted before the merged files could be named
+          continueFailedRenames(partitionFolder, MERGE_SUFFIX);
 
-        Collections.addAll(tsFiles,
-            fsFactory.listFilesBySuffix(timeRangeFileFolder.getAbsolutePath(), 
TSFILE_SUFFIX));
+          Collections.addAll(tsFiles,
+              fsFactory.listFilesBySuffix(partitionFolder.getAbsolutePath(), 
TSFILE_SUFFIX));
+        }
       }
 
     }
@@ -796,12 +836,12 @@ public class StorageGroupProcessor {
    * @return file name
    */
   private String getNewTsFileName(long timePartitionId) {
-    return getNewTsFileName(System.currentTimeMillis(),
-        getVersionControllerByTimePartitionId(timePartitionId).nextVersion(), 
0);
+    long version = 
getVersionControllerByTimePartitionId(timePartitionId).nextVersion();
+    partitionDirectFileVersions.computeIfAbsent(timePartitionId, p -> new 
HashSet<>()).add(version);
+    return getNewTsFileName(System.currentTimeMillis(), version, 0);
   }
 
   private String getNewTsFileName(long time, long version, int mergeCnt) {
-    allDirectFileVersions.add(version);
     return time + IoTDBConstant.TSFILE_NAME_SEPARATOR + version
         + IoTDBConstant.TSFILE_NAME_SEPARATOR + mergeCnt + TSFILE_SUFFIX;
   }
@@ -1185,22 +1225,7 @@ public class StorageGroupProcessor {
       // time partition to divide storage group
       long timePartitionId = StorageEngine.fromTimeToTimePartition(timestamp);
       // write log
-      if (IoTDBDescriptor.getInstance().getConfig().isEnableWal()) {
-        DeletePlan deletionPlan = new DeletePlan(timestamp, new Path(deviceId, 
measurementId));
-        for (Map.Entry<Long, TsFileProcessor> entry : 
workSequenceTsFileProcessors.entrySet()) {
-          if (entry.getKey() <= timePartitionId) {
-            entry.getValue().getLogNode()
-                .write(deletionPlan);
-          }
-        }
-
-        for (Map.Entry<Long, TsFileProcessor> entry : 
workUnsequenceTsFileProcessors.entrySet()) {
-          if (entry.getKey() <= timePartitionId) {
-            entry.getValue().getLogNode()
-                .write(deletionPlan);
-          }
-        }
-      }
+      logDeletion(timestamp, deviceId, measurementId, timePartitionId);
 
       Path fullPath = new Path(deviceId, measurementId);
       Deletion deletion = new Deletion(fullPath,
@@ -1225,6 +1250,24 @@ public class StorageGroupProcessor {
     }
   }
 
+  private void logDeletion(long timestamp, String deviceId, String 
measurementId, long timePartitionId)
+      throws IOException {
+    if (IoTDBDescriptor.getInstance().getConfig().isEnableWal()) {
+      DeletePlan deletionPlan = new DeletePlan(timestamp, new Path(deviceId, 
measurementId));
+      for (Map.Entry<Long, TsFileProcessor> entry : 
workSequenceTsFileProcessors.entrySet()) {
+        if (entry.getKey() <= timePartitionId) {
+          entry.getValue().getLogNode().write(deletionPlan);
+        }
+      }
+
+      for (Map.Entry<Long, TsFileProcessor> entry : 
workUnsequenceTsFileProcessors.entrySet()) {
+        if (entry.getKey() <= timePartitionId) {
+          entry.getValue().getLogNode().write(deletionPlan);
+        }
+      }
+    }
+  }
+
 
   private void deleteDataInFiles(Collection<TsFileResource> 
tsFileResourceList, Deletion deletion,
       List<ModificationFile> updatedModFiles)
@@ -1521,13 +1564,14 @@ public class StorageGroupProcessor {
    * @param newTsFileResource tsfile resource
    * @UsedBy sync module.
    */
-  public void loadNewTsFileForSync(TsFileResource newTsFileResource)
-      throws TsFileProcessorException {
+  public void loadNewTsFileForSync(TsFileResource newTsFileResource) throws 
LoadFileException {
     File tsfileToBeInserted = newTsFileResource.getFile();
+    long newFilePartitionId = getNewFilePartitionId(newTsFileResource);
     writeLock();
     mergeLock.writeLock().lock();
     try {
-      if (loadTsFileByType(LoadTsFileType.LOAD_SEQUENCE, tsfileToBeInserted, 
newTsFileResource)){
+      if (loadTsFileByType(LoadTsFileType.LOAD_SEQUENCE, tsfileToBeInserted, 
newTsFileResource,
+          newFilePartitionId)){
         updateLatestTimeMap(newTsFileResource);
       }
     } catch (DiskSpaceInsufficientException e) {
@@ -1535,7 +1579,7 @@ public class StorageGroupProcessor {
           "Failed to append the tsfile {} to storage group processor {} 
because the disk space is insufficient.",
           tsfileToBeInserted.getAbsolutePath(), 
tsfileToBeInserted.getParentFile().getName());
       IoTDBDescriptor.getInstance().getConfig().setReadOnly(true);
-      throw new TsFileProcessorException(e);
+      throw new LoadFileException(e);
     } finally {
       mergeLock.writeLock().unlock();
       writeUnlock();
@@ -1556,82 +1600,51 @@ public class StorageGroupProcessor {
    * @param newTsFileResource tsfile resource
    * @UsedBy load external tsfile module
    */
-  public void loadNewTsFile(TsFileResource newTsFileResource)
-      throws TsFileProcessorException {
+  public void loadNewTsFile(TsFileResource newTsFileResource) throws 
LoadFileException {
     File tsfileToBeInserted = newTsFileResource.getFile();
+    long newFilePartitionId = getNewFilePartitionId(newTsFileResource);
     writeLock();
     mergeLock.writeLock().lock();
     try {
-      boolean isOverlap = false;
-      int preIndex = -1, subsequentIndex = sequenceFileTreeSet.size();
-
       List<TsFileResource> sequenceList = new ArrayList<>(sequenceFileTreeSet);
-      // check new tsfile
-      outer:
-      for (int i = 0; i < sequenceList.size(); i++) {
-        if 
(sequenceList.get(i).getFile().getName().equals(tsfileToBeInserted.getName())) {
-          return;
-        }
-        if (i == sequenceList.size() - 1 && 
sequenceList.get(i).getEndTimeMap().isEmpty()) {
-          continue;
-        }
-        boolean hasPre = false, hasSubsequence = false;
-        for (String device : newTsFileResource.getStartTimeMap().keySet()) {
-          if (sequenceList.get(i).getStartTimeMap().containsKey(device)) {
-            long startTime1 = 
sequenceList.get(i).getStartTimeMap().get(device);
-            long endTime1 = sequenceList.get(i).getEndTimeMap().get(device);
-            long startTime2 = newTsFileResource.getStartTimeMap().get(device);
-            long endTime2 = newTsFileResource.getEndTimeMap().get(device);
-            if (startTime1 > endTime2) {
-              hasSubsequence = true;
-            } else if (startTime2 > endTime1) {
-              hasPre = true;
-            } else {
-              isOverlap = true;
-              break outer;
-            }
-          }
-        }
-        if (hasPre && hasSubsequence) {
-          isOverlap = true;
-          break;
-        }
-        if (!hasPre && hasSubsequence) {
-          subsequentIndex = i;
-          break;
-        }
-        if (hasPre) {
-          preIndex = i;
-        }
+
+      int insertPos = findInsertionPosition(newTsFileResource, 
newFilePartitionId, sequenceList);
+      if (insertPos == POS_ALREADY_EXIST) {
+        return;
       }
 
       // loading tsfile by type
-      if (isOverlap) {
-        loadTsFileByType(LoadTsFileType.LOAD_UNSEQUENCE, tsfileToBeInserted, 
newTsFileResource);
+      if (insertPos == POS_OVERLAP) {
+        loadTsFileByType(LoadTsFileType.LOAD_UNSEQUENCE, tsfileToBeInserted, 
newTsFileResource,
+            newFilePartitionId);
       } else {
 
         // check whether the file name needs to be renamed.
-        if (subsequentIndex != sequenceFileTreeSet.size() || preIndex != -1) {
-          String newFileName = 
getFileNameForLoadingFile(tsfileToBeInserted.getName(), preIndex,
-              subsequentIndex, 
getTimePartitionFromTsFileResource(newTsFileResource));
+        if (!sequenceFileTreeSet.isEmpty()) {
+          String newFileName = 
getFileNameForLoadingFile(tsfileToBeInserted.getName(), insertPos,
+              getTimePartitionFromTsFileResource(newTsFileResource), 
sequenceList);
           if (!newFileName.equals(tsfileToBeInserted.getName())) {
             logger.info("Tsfile {} must be renamed to {} for loading into the 
sequence list.",
                 tsfileToBeInserted.getName(), newFileName);
             newTsFileResource.setFile(new 
File(tsfileToBeInserted.getParentFile(), newFileName));
           }
         }
-        loadTsFileByType(LoadTsFileType.LOAD_SEQUENCE, tsfileToBeInserted, 
newTsFileResource);
+        loadTsFileByType(LoadTsFileType.LOAD_SEQUENCE, tsfileToBeInserted, 
newTsFileResource,
+            newFilePartitionId);
       }
 
       // update latest time map
       updateLatestTimeMap(newTsFileResource);
-      allDirectFileVersions.addAll(newTsFileResource.getHistoricalVersions());
+      String[] filePathSplit = 
FilePathUtils.splitTsFilePath(newTsFileResource);
+      long partitionNum = Long.parseLong(filePathSplit[filePathSplit.length - 
2]);
+      partitionDirectFileVersions.computeIfAbsent(partitionNum, p -> new 
HashSet<>())
+          .addAll(newTsFileResource.getHistoricalVersions());
     } catch (DiskSpaceInsufficientException e) {
       logger.error(
           "Failed to append the tsfile {} to storage group processor {} 
because the disk space is insufficient.",
           tsfileToBeInserted.getAbsolutePath(), 
tsfileToBeInserted.getParentFile().getName());
       IoTDBDescriptor.getInstance().getConfig().setReadOnly(true);
-      throw new TsFileProcessorException(e);
+      throw new LoadFileException(e);
     } finally {
       mergeLock.writeLock().unlock();
       writeUnlock();
@@ -1639,11 +1652,133 @@ public class StorageGroupProcessor {
   }
 
   /**
+   * Check and get the partition id of a TsFile to be inserted using the start 
times and end
+   * times of devices.
+   * TODO: when the partition violation happens, split the file and load into 
different partitions
+   * @throws LoadFileException if the data of the file cross partitions or it 
is empty
+   */
+  private long getNewFilePartitionId(TsFileResource resource) throws 
LoadFileException {
+    long partitionId = -1;
+    for (Long startTime : resource.getStartTimeMap().values()) {
+      long p = StorageEngine.fromTimeToTimePartition(startTime);
+      if (partitionId == -1) {
+        partitionId = p;
+      } else {
+        if (partitionId != p) {
+          throw new PartitionViolationException(resource);
+        }
+      }
+    }
+    for (Long endTime : resource.getEndTimeMap().values()) {
+      long p = StorageEngine.fromTimeToTimePartition(endTime);
+      if (partitionId == -1) {
+        partitionId = p;
+      } else {
+        if (partitionId != p) {
+          throw new PartitionViolationException(resource);
+        }
+      }
+    }
+    if (partitionId == -1) {
+      throw new LoadEmptyFileException();
+    }
+    return partitionId;
+  }
+
+  /**
+   * Find the position of "newTsFileResource" in the sequence files if it can 
be inserted into them.
+   * @param newTsFileResource
+   * @param newFilePartitionId
+   * @return POS_ALREADY_EXIST(-2) if some file has the same name as the one 
to be inserted
+   *         POS_OVERLAP(-3) if some file overlaps the new file
+   *         an insertion position i >= -1 if the new file can be inserted 
between [i, i+1]
+   */
+  private int findInsertionPosition(TsFileResource newTsFileResource, long 
newFilePartitionId,
+      List<TsFileResource> sequenceList) {
+    File tsfileToBeInserted = newTsFileResource.getFile();
+
+    int insertPos = -1;
+
+    // find the position where the new file should be inserted
+    for (int i = 0; i < sequenceList.size(); i++) {
+      TsFileResource localFile = sequenceList.get(i);
+      if (localFile.getFile().getName().equals(tsfileToBeInserted.getName())) {
+        return POS_ALREADY_EXIST;
+      }
+      long localPartitionId = 
Long.parseLong(localFile.getFile().getParentFile().getName());
+      if (i == sequenceList.size() - 1 && localFile.getEndTimeMap().isEmpty()
+          || newFilePartitionId > localPartitionId) {
+        // skip files that are in the previous partition and the last empty 
file, as the all data
+        // in those files must be older than the new file
+        continue;
+      }
+
+      int fileComparison = compareTsFileDevices(newTsFileResource, localFile);
+      switch (fileComparison) {
+        case 0:
+          // some devices are newer but some devices are older, the two files 
overlap in general
+          return POS_OVERLAP;
+        case -1:
+          // all devices in localFile are newer than the new file, the new 
file can be
+          // inserted before localFile
+          return i - 1;
+        default:
+          // all devices in the local file are older than the new file, 
proceed to the next file
+          insertPos = i;
+      }
+    }
+    return insertPos;
+  }
+
+  /**
+   * Compare each device in the two files to find the time relation of them.
+   * @param fileA
+   * @param fileB
+   * @return -1 if fileA is totally older than fileB (A < B)
+   *          0 if fileA is partially older than fileB and partially newer 
than fileB (A X B)
+   *          1 if fileA is totally newer than fileB (B < A)
+   */
+  private int compareTsFileDevices(TsFileResource fileA, TsFileResource fileB) 
{
+    boolean hasPre = false, hasSubsequence = false;
+    for (String device : fileA.getStartTimeMap().keySet()) {
+      if (!fileB.getStartTimeMap().containsKey(device)) {
+        continue;
+      }
+      long startTimeA = fileA.getStartTimeMap().get(device);
+      long endTimeA = fileA.getEndTimeMap().get(device);
+      long startTimeB = fileB.getStartTimeMap().get(device);
+      long endTimeB = fileB.getEndTimeMap().get(device);
+      if (startTimeA > endTimeB) {
+        // A's data of the device is later than to the B's data
+        hasPre = true;
+      } else if (startTimeB > endTimeA) {
+        // A's data of the device is previous to the B's data
+        hasSubsequence = true;
+      } else {
+        // the two files overlap in the device
+        return 0;
+      }
+    }
+    if (hasPre && hasSubsequence) {
+      // some devices are newer but some devices are older, the two files 
overlap in general
+      return 0;
+    }
+    if (!hasPre && hasSubsequence) {
+      // all devices in B are newer than those in A
+      return -1;
+    }
+    // all devices in B are older than those in A
+    return 1;
+  }
+
+  /**
    * If the historical versions of a file is a sub-set of the given file's, 
remove it to reduce
    * unnecessary merge. Only used when the file sender and the receiver share 
the same file
    * close policy.
+   * Warning: DO NOT REMOVE
    * @param resource
    */
+  @SuppressWarnings("unused")
   public void removeFullyOverlapFiles(TsFileResource resource) {
     writeLock();
     closeQueryLock.writeLock().lock();
@@ -1702,24 +1837,25 @@ public class StorageGroupProcessor {
    * version number is the version number in the tsfile with a larger 
timestamp.
    *
    * @param tsfileName origin tsfile name
+   * @param insertIndex the new file will be inserted between the files 
[insertIndex, insertIndex
+   *                   + 1]
    * @return appropriate filename
    */
-  private String getFileNameForLoadingFile(String tsfileName, int preIndex, 
int subsequentIndex,
-      long timePartitionId) {
+  private String getFileNameForLoadingFile(String tsfileName, int insertIndex,
+      long timePartitionId, List<TsFileResource> sequenceList) {
     long currentTsFileTime = Long
         .parseLong(tsfileName.split(IoTDBConstant.TSFILE_NAME_SEPARATOR)[0]);
     long preTime;
-    List<TsFileResource> sequenceList = new ArrayList<>(sequenceFileTreeSet);
-    if (preIndex == -1) {
+    if (insertIndex == -1) {
       preTime = 0L;
     } else {
-      String preName = sequenceList.get(preIndex).getFile().getName();
+      String preName = sequenceList.get(insertIndex).getFile().getName();
       preTime = 
Long.parseLong(preName.split(IoTDBConstant.TSFILE_NAME_SEPARATOR)[0]);
     }
-    if (subsequentIndex == sequenceFileTreeSet.size()) {
+    if (insertIndex == sequenceFileTreeSet.size() - 1) {
       return preTime < currentTsFileTime ? tsfileName : 
getNewTsFileName(timePartitionId);
     } else {
-      String subsequenceName = 
sequenceList.get(subsequentIndex).getFile().getName();
+      String subsequenceName = sequenceList.get(insertIndex + 
1).getFile().getName();
       long subsequenceTime = Long
           
.parseLong(subsequenceName.split(IoTDBConstant.TSFILE_NAME_SEPARATOR)[0]);
       long subsequenceVersion = Long
@@ -1767,19 +1903,18 @@ public class StorageGroupProcessor {
    *
    * @param type load type
    * @param tsFileResource tsfile resource to be loaded
+   * @param filePartitionId the partition id of the new file
    * @UsedBy sync module, load external tsfile module.
    * @return load the file successfully
    */
   private boolean loadTsFileByType(LoadTsFileType type, File syncedTsFile,
-      TsFileResource tsFileResource)
-      throws TsFileProcessorException, DiskSpaceInsufficientException {
+      TsFileResource tsFileResource, long filePartitionId)
+      throws LoadFileException, DiskSpaceInsufficientException {
     File targetFile;
-    long timeRangeId = StorageEngine.fromTimeToTimePartition(
-        
tsFileResource.getStartTimeMap().entrySet().iterator().next().getValue());
     switch (type) {
       case LOAD_UNSEQUENCE:
         targetFile = new 
File(DirectoryManager.getInstance().getNextFolderForUnSequenceFile(),
-            storageGroupName + File.separatorChar + timeRangeId + 
File.separator + tsFileResource
+            storageGroupName + File.separatorChar + filePartitionId + 
File.separator + tsFileResource
                 .getFile().getName());
         tsFileResource.setFile(targetFile);
         if(unSequenceFileList.contains(tsFileResource)){
@@ -1793,7 +1928,7 @@ public class StorageGroupProcessor {
       case LOAD_SEQUENCE:
         targetFile =
             new 
File(DirectoryManager.getInstance().getNextFolderForSequenceFile(),
-                storageGroupName + File.separatorChar + timeRangeId + 
File.separator
+                storageGroupName + File.separatorChar + filePartitionId + 
File.separator
                     + tsFileResource.getFile().getName());
         tsFileResource.setFile(targetFile);
         if(sequenceFileTreeSet.contains(tsFileResource)){
@@ -1805,7 +1940,7 @@ public class StorageGroupProcessor {
             syncedTsFile.getAbsolutePath(), targetFile.getAbsolutePath());
         break;
       default:
-        throw new TsFileProcessorException(
+        throw new LoadFileException(
             String.format("Unsupported type of loading tsfile : %s", type));
     }
 
@@ -1818,7 +1953,7 @@ public class StorageGroupProcessor {
     } catch (IOException e) {
       logger.error("File renaming failed when loading tsfile. Origin: {}, 
Target: {}",
           syncedTsFile.getAbsolutePath(), targetFile.getAbsolutePath(), e);
-      throw new TsFileProcessorException(String.format(
+      throw new LoadFileException(String.format(
           "File renaming failed when loading tsfile. Origin: %s, Target: %s, 
because %s",
           syncedTsFile.getAbsolutePath(), targetFile.getAbsolutePath(), 
e.getMessage()));
     }
@@ -1832,11 +1967,13 @@ public class StorageGroupProcessor {
     } catch (IOException e) {
       logger.error("File renaming failed when loading .resource file. Origin: 
{}, Target: {}",
           syncedResourceFile.getAbsolutePath(), 
targetResourceFile.getAbsolutePath(), e);
-      throw new TsFileProcessorException(String.format(
+      throw new LoadFileException(String.format(
           "File renaming failed when loading .resource file. Origin: %s, 
Target: %s, because %s",
           syncedResourceFile.getAbsolutePath(), 
targetResourceFile.getAbsolutePath(),
           e.getMessage()));
     }
+    partitionDirectFileVersions.computeIfAbsent(filePartitionId,
+        p -> new HashSet<>()).addAll(tsFileResource.getHistoricalVersions());
     return true;
   }
 
@@ -1985,8 +2122,9 @@ public class StorageGroupProcessor {
     return storageGroupName;
   }
 
-  public boolean isFileAlreadyExist(TsFileResource tsFileResource) {
-    return 
allDirectFileVersions.containsAll(tsFileResource.getHistoricalVersions());
+  public boolean isFileAlreadyExist(TsFileResource tsFileResource, long 
partitionNum) {
+    return partitionDirectFileVersions.getOrDefault(partitionNum, 
Collections.emptySet())
+        .containsAll(tsFileResource.getHistoricalVersions());
   }
 
   @FunctionalInterface
diff --git 
a/server/src/main/java/org/apache/iotdb/db/exception/LoadEmptyFileException.java
 
b/server/src/main/java/org/apache/iotdb/db/exception/LoadEmptyFileException.java
new file mode 100644
index 0000000..9ba22e2
--- /dev/null
+++ 
b/server/src/main/java/org/apache/iotdb/db/exception/LoadEmptyFileException.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+
+package org.apache.iotdb.db.exception;
+
+public class LoadEmptyFileException extends LoadFileException {
+
+  public LoadEmptyFileException() {
+    super("Cannot load an empty file");
+  }
+
+}
diff --git 
a/server/src/main/java/org/apache/iotdb/db/exception/LoadFileException.java 
b/server/src/main/java/org/apache/iotdb/db/exception/LoadFileException.java
new file mode 100644
index 0000000..3af898d
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/exception/LoadFileException.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.exception;
+
+import org.apache.iotdb.rpc.TSStatusCode;
+
+public class LoadFileException extends IoTDBException {
+
+  public LoadFileException(String message) {
+    super(message, TSStatusCode.LOAD_FILE_ERROR.getStatusCode());
+  }
+
+  public LoadFileException(Exception exception) {
+    super(exception, TSStatusCode.LOAD_FILE_ERROR.getStatusCode());
+  }
+}
diff --git 
a/server/src/main/java/org/apache/iotdb/db/exception/PartitionViolationException.java
 
b/server/src/main/java/org/apache/iotdb/db/exception/PartitionViolationException.java
new file mode 100644
index 0000000..c794b61
--- /dev/null
+++ 
b/server/src/main/java/org/apache/iotdb/db/exception/PartitionViolationException.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.exception;
+
+import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+
+public class PartitionViolationException extends LoadFileException{
+
+  public PartitionViolationException(TsFileResource resource) {
+    super(String.format("The data of file %s cross partitions", resource));
+  }
+}
diff --git 
a/server/src/main/java/org/apache/iotdb/db/sync/receiver/load/FileLoader.java 
b/server/src/main/java/org/apache/iotdb/db/sync/receiver/load/FileLoader.java
index b7cb388..c6a07be 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/sync/receiver/load/FileLoader.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/sync/receiver/load/FileLoader.java
@@ -26,9 +26,9 @@ import java.util.concurrent.TimeUnit;
 import org.apache.commons.io.FileUtils;
 import org.apache.iotdb.db.engine.StorageEngine;
 import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.db.exception.LoadFileException;
 import org.apache.iotdb.db.exception.StorageEngineException;
 import org.apache.iotdb.db.exception.SyncDeviceOwnerConflictException;
-import org.apache.iotdb.db.exception.TsFileProcessorException;
 import org.apache.iotdb.db.sync.conf.SyncConstant;
 import org.apache.iotdb.db.utils.FileLoaderUtils;
 import org.slf4j.Logger;
@@ -139,7 +139,7 @@ public class FileLoader implements IFileLoader {
       StorageEngine.getInstance().loadNewTsFileForSync(tsFileResource);
     } catch (SyncDeviceOwnerConflictException e) {
       LOGGER.error("Device owner has conflicts, so skip the loading file", e);
-    } catch (TsFileProcessorException | StorageEngineException e) {
+    } catch (LoadFileException | StorageEngineException e) {
       LOGGER.error("Can not load new tsfile {}", newTsFile.getAbsolutePath(), 
e);
       throw new IOException(e);
     }
diff --git 
a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBLoadExternalTsfileTest.java
 
b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBLoadExternalTsfileTest.java
index 1f377c5..4b01523 100644
--- 
a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBLoadExternalTsfileTest.java
+++ 
b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBLoadExternalTsfileTest.java
@@ -186,7 +186,7 @@ public class IoTDBLoadExternalTsfileTest {
               .getSequenceFileTreeSet());
       File tmpDir = new File(
           
resources.get(0).getFile().getParentFile().getParentFile().getParentFile(),
-          "tmp" + File.separator + "root.vehicle");
+          "tmp" + File.separator + "root.vehicle" + File.separator + "0");
       if (!tmpDir.exists()) {
         tmpDir.mkdirs();
       }
@@ -199,7 +199,7 @@ public class IoTDBLoadExternalTsfileTest {
           StorageEngine.getInstance().getProcessor("root.test")
               .getSequenceFileTreeSet());
       tmpDir = new 
File(resources.get(0).getFile().getParentFile().getParentFile().getParentFile(),
-          "tmp" + File.separator + "root.test");
+          "tmp" + File.separator + "root.test" + File.separator + "0");
       if (!tmpDir.exists()) {
         tmpDir.mkdirs();
       }
@@ -220,8 +220,8 @@ public class IoTDBLoadExternalTsfileTest {
               .getSequenceFileTreeSet());
       assertEquals(2, resources.size());
       assertNotNull(tmpDir.listFiles());
-      assertEquals(0, new File(tmpDir, "root.vehicle").listFiles().length);
-      assertEquals(0, new File(tmpDir, "root.test").listFiles().length);
+      assertEquals(0, new File(tmpDir, "root.vehicle" + File.separator + 
"0").listFiles().length);
+      assertEquals(0, new File(tmpDir, "root.test" + File.separator + 
"0").listFiles().length);
     } catch (StorageEngineException e) {
       Assert.fail();
     }
@@ -289,7 +289,7 @@ public class IoTDBLoadExternalTsfileTest {
       assertEquals(2, resources.size());
       File tmpDir = new File(
           
resources.get(0).getFile().getParentFile().getParentFile().getParentFile(),
-          "tmp" + File.separator + "root.vehicle");
+          "tmp" + File.separator + "root.vehicle" + File.separator + "0");
       if (!tmpDir.exists()) {
         tmpDir.mkdirs();
       }
@@ -307,7 +307,7 @@ public class IoTDBLoadExternalTsfileTest {
       resources = new ArrayList<>(
           
StorageEngine.getInstance().getProcessor("root.test").getSequenceFileTreeSet());
       assertEquals(2, resources.size());
-      tmpDir = new File(tmpDir.getParent(), "root.test");
+      tmpDir = new File(tmpDir.getParentFile().getParentFile(), "root.test" + 
File.separator + "0");
       if (!tmpDir.exists()) {
         tmpDir.mkdirs();
       }
@@ -322,7 +322,7 @@ public class IoTDBLoadExternalTsfileTest {
       }
 
       // load all tsfile in tmp dir
-      tmpDir = tmpDir.getParentFile();
+      tmpDir = tmpDir.getParentFile().getParentFile();
       statement.execute(String.format("load %s", tmpDir.getAbsolutePath()));
       assertEquals(2, StorageEngine.getInstance().getProcessor("root.vehicle")
           .getSequenceFileTreeSet().size());
@@ -333,8 +333,8 @@ public class IoTDBLoadExternalTsfileTest {
       assertEquals(3, StorageEngine.getInstance().getProcessor("root.test")
           .getSequenceFileTreeSet().size());
       assertNotNull(tmpDir.listFiles());
-      assertEquals(0, new File(tmpDir, "root.vehicle").listFiles().length);
-      assertEquals(0, new File(tmpDir, "root.test").listFiles().length);
+      assertEquals(0, new File(tmpDir, "root.vehicle"  + File.separator + 
"0").listFiles().length);
+      assertEquals(0, new File(tmpDir, "root.test"  + File.separator + 
"0").listFiles().length);
 
       // check query result
       hasResultSet = statement.execute("SELECT  * FROM root");
@@ -370,7 +370,7 @@ public class IoTDBLoadExternalTsfileTest {
 
       File tmpDir = new File(
           
resources.get(0).getFile().getParentFile().getParentFile().getParentFile(),
-          "tmp" + File.separator + "root.vehicle");
+          "tmp" + File.separator + "root.vehicle" + File.separator + "0");
       if (!tmpDir.exists()) {
         tmpDir.mkdirs();
       }
@@ -383,7 +383,7 @@ public class IoTDBLoadExternalTsfileTest {
           StorageEngine.getInstance().getProcessor("root.test")
               .getSequenceFileTreeSet());
       tmpDir = new 
File(resources.get(0).getFile().getParentFile().getParentFile().getParentFile(),
-          "tmp" + File.separator + "root.test");
+          "tmp" + File.separator + "root.test" + File.separator + "0");
       if (!tmpDir.exists()) {
         tmpDir.mkdirs();
       }
@@ -432,7 +432,7 @@ public class IoTDBLoadExternalTsfileTest {
       Assert.assertTrue(hasError);
 
       // test load metadata automatically, it will succeed.
-      tmpDir = tmpDir.getParentFile();
+      tmpDir = tmpDir.getParentFile().getParentFile();
       statement.execute(String.format("load %s true 2", 
tmpDir.getAbsolutePath()));
       resources = new ArrayList<>(
           StorageEngine.getInstance().getProcessor("root.vehicle")
@@ -444,9 +444,10 @@ public class IoTDBLoadExternalTsfileTest {
       assertEquals(2, resources.size());
       assertEquals(2, tmpDir.listFiles().length);
       for (File dir : tmpDir.listFiles()) {
-        assertEquals(0, dir.listFiles().length);
+        assertEquals(0, dir.listFiles()[0].listFiles().length);
       }
     } catch (StorageEngineException e) {
+      e.printStackTrace();
       Assert.fail();
     }
   }
diff --git a/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java 
b/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
index 74710cd..b22c431 100644
--- a/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
+++ b/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
@@ -39,6 +39,7 @@ public enum TSStatusCode {
   STORAGE_ENGINE_ERROR(313),
   TSFILE_PROCESSOR_ERROR(314),
   PATH_ILLEGAL(315),
+  LOAD_FILE_ERROR(316),
 
   EXECUTE_STATEMENT_ERROR(400),
   SQL_PARSE_ERROR(401),

Reply via email to