This is an automated email from the ASF dual-hosted git repository. qiaojialin pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
The following commit(s) were added to refs/heads/master by this push: new f6f216e Integrate data file version recording with time partitioning (#935) f6f216e is described below commit f6f216e888459dedf392cbe1d7754f097e11db91 Author: Jiang Tian <jt2594...@163.com> AuthorDate: Wed Mar 25 03:35:21 2020 -0500 Integrate data file version recording with time partitioning (#935) * integrate data partition with file version management --- .../org/apache/iotdb/db/engine/StorageEngine.java | 12 +- .../engine/storagegroup/StorageGroupProcessor.java | 394 ++++++++++++++------- .../iotdb/db/exception/LoadEmptyFileException.java | 29 ++ .../iotdb/db/exception/LoadFileException.java | 33 ++ .../db/exception/PartitionViolationException.java | 29 ++ .../iotdb/db/sync/receiver/load/FileLoader.java | 4 +- .../integration/IoTDBLoadExternalTsfileTest.java | 27 +- .../java/org/apache/iotdb/rpc/TSStatusCode.java | 1 + 8 files changed, 380 insertions(+), 149 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java index c63f728..c2da578 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java @@ -48,8 +48,8 @@ import org.apache.iotdb.db.engine.querycontext.QueryDataSource; import org.apache.iotdb.db.engine.storagegroup.StorageGroupProcessor; import org.apache.iotdb.db.engine.storagegroup.TsFileProcessor; import org.apache.iotdb.db.engine.storagegroup.TsFileResource; +import org.apache.iotdb.db.exception.LoadFileException; import org.apache.iotdb.db.exception.StorageEngineException; -import org.apache.iotdb.db.exception.TsFileProcessorException; import org.apache.iotdb.db.exception.metadata.MetadataException; import org.apache.iotdb.db.exception.metadata.StorageGroupNotSetException; import org.apache.iotdb.db.exception.runtime.StorageEngineFailureException; @@ -481,13 +481,13 @@ public class StorageEngine implements IService { } public void loadNewTsFileForSync(TsFileResource newTsFileResource) - throws TsFileProcessorException, StorageEngineException { + throws StorageEngineException, LoadFileException { getProcessor(newTsFileResource.getFile().getParentFile().getName()) .loadNewTsFileForSync(newTsFileResource); } public void loadNewTsFile(TsFileResource newTsFileResource) - throws TsFileProcessorException, StorageEngineException, MetadataException { + throws LoadFileException, StorageEngineException, MetadataException { Map<String, Long> startTimeMap = newTsFileResource.getStartTimeMap(); if (startTimeMap == null || startTimeMap.isEmpty()) { throw new StorageEngineException("Can not get the corresponding storage group."); @@ -549,10 +549,10 @@ public class StorageEngine implements IService { this.fileFlushPolicy = fileFlushPolicy; } - public boolean isFileAlreadyExist(TsFileResource tsFileResource, String storageGroup) { - // TODO-Cluster#350: integrate with time partitioning + public boolean isFileAlreadyExist(TsFileResource tsFileResource, String storageGroup, + long partitionNum) { StorageGroupProcessor processor = processorMap.get(storageGroup); - return processor != null && processor.isFileAlreadyExist(tsFileResource); + return processor != null && processor.isFileAlreadyExist(tsFileResource, partitionNum); } public static long getTimePartitionInterval() { diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java index 295a074..d6b2c98 100755 --- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java @@ -18,6 +18,28 @@ */ package org.apache.iotdb.db.engine.storagegroup; +import static org.apache.iotdb.db.engine.merge.task.MergeTask.MERGE_SUFFIX; +import static org.apache.iotdb.db.engine.storagegroup.TsFileResource.TEMP_SUFFIX; +import static org.apache.iotdb.tsfile.common.constant.TsFileConstant.TSFILE_SUFFIX; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Date; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.TreeMap; +import java.util.TreeSet; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.commons.io.FileUtils; import org.apache.iotdb.db.conf.IoTDBConstant; import org.apache.iotdb.db.conf.IoTDBDescriptor; @@ -40,7 +62,14 @@ import org.apache.iotdb.db.engine.querycontext.QueryDataSource; import org.apache.iotdb.db.engine.querycontext.ReadOnlyMemChunk; import org.apache.iotdb.db.engine.version.SimpleFileVersionController; import org.apache.iotdb.db.engine.version.VersionController; -import org.apache.iotdb.db.exception.*; +import org.apache.iotdb.db.exception.DiskSpaceInsufficientException; +import org.apache.iotdb.db.exception.LoadEmptyFileException; +import org.apache.iotdb.db.exception.LoadFileException; +import org.apache.iotdb.db.exception.MergeException; +import org.apache.iotdb.db.exception.PartitionViolationException; +import org.apache.iotdb.db.exception.StorageGroupProcessorException; +import org.apache.iotdb.db.exception.TsFileProcessorException; +import org.apache.iotdb.db.exception.WriteProcessException; import org.apache.iotdb.db.exception.metadata.MetadataException; import org.apache.iotdb.db.exception.query.OutOfTTLException; import org.apache.iotdb.db.exception.query.QueryProcessException; @@ -53,6 +82,7 @@ import org.apache.iotdb.db.qp.physical.crud.InsertPlan; import org.apache.iotdb.db.query.context.QueryContext; import org.apache.iotdb.db.query.control.QueryFileManager; import org.apache.iotdb.db.utils.CopyOnReadLinkedList; +import org.apache.iotdb.db.utils.FilePathUtils; import org.apache.iotdb.db.utils.UpgradeUtils; import org.apache.iotdb.db.writelog.recover.TsFileRecoverPerformer; import org.apache.iotdb.rpc.RpcUtils; @@ -73,17 +103,6 @@ import org.apache.iotdb.tsfile.write.writer.RestorableTsFileIOWriter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.File; -import java.io.IOException; -import java.util.*; -import java.util.Map.Entry; -import java.util.concurrent.locks.ReadWriteLock; -import java.util.concurrent.locks.ReentrantReadWriteLock; - -import static org.apache.iotdb.db.engine.merge.task.MergeTask.MERGE_SUFFIX; -import static org.apache.iotdb.db.engine.storagegroup.TsFileResource.TEMP_SUFFIX; -import static org.apache.iotdb.tsfile.common.constant.TsFileConstant.TSFILE_SUFFIX; - /** * For sequence data, a StorageGroupProcessor has some TsFileProcessors, in which there is only one @@ -110,6 +129,15 @@ public class StorageGroupProcessor { private static final String MERGING_MODIFICATION_FILE_NAME = "merge.mods"; private static final Logger logger = LoggerFactory.getLogger(StorageGroupProcessor.class); private static final int MAX_CACHE_SENSORS = 5000; + + /** + * indicating the file to be loaded already exists locally. + */ + private static final int POS_ALREADY_EXIST = -2; + /** + * indicating the file to be loaded overlap with some files. + */ + private static final int POS_OVERLAP = -3; /** * a read write lock for guaranteeing concurrent safety when accessing all fields in this class * (i.e., schema, (un)sequenceFileList, work(un)SequenceTsFileProcessor, @@ -203,9 +231,16 @@ public class StorageGroupProcessor { private FSFactory fsFactory = FSFactoryProducer.getFSFactory(); private TsFileFlushPolicy fileFlushPolicy; - // allDirectFileVersions records the versions of the direct TsFiles (generated by flush), not - // including the files generated by merge - private Set<Long> allDirectFileVersions = new HashSet<>(); + /** + * partitionDirectFileVersions records the versions of the direct TsFiles (generated by close, + * not including the files generated by merge) of each partition. + * As data file close is managed by the leader in the distributed version, the files with the + * same version(s) have the same data, despite that the inner structure (the size and + * organization of chunks) may be different, so we can easily find what remote files we do not + * have locally. + * partition number -> version number set + */ + private Map<Long, Set<Long>> partitionDirectFileVersions = new HashMap<>(); public StorageGroupProcessor(String systemInfoDir, String storageGroupName, TsFileFlushPolicy fileFlushPolicy) @@ -246,14 +281,18 @@ public class StorageGroupProcessor { if (resource.getFile().length() == 0) { deleteTsfile(resource.getFile()); } - allDirectFileVersions.addAll(resource.getHistoricalVersions()); + String[] filePathSplit = FilePathUtils.splitTsFilePath(resource); + long partitionNum = Long.parseLong(filePathSplit[filePathSplit.length - 2]); + partitionDirectFileVersions.computeIfAbsent(partitionNum, p -> new HashSet<>()).addAll(resource.getHistoricalVersions()); } for (TsFileResource resource : unseqTsFiles) { //After recover, case the TsFile's length is equal to 0, delete both the TsFileResource and the file itself if (resource.getFile().length() == 0) { deleteTsfile(resource.getFile()); } - allDirectFileVersions.addAll(resource.getHistoricalVersions()); + String[] filePathSplit = FilePathUtils.splitTsFilePath(resource); + long partitionNum = Long.parseLong(filePathSplit[filePathSplit.length - 2]); + partitionDirectFileVersions.computeIfAbsent(partitionNum, p -> new HashSet<>()).addAll(resource.getHistoricalVersions()); } String taskName = storageGroupName + "-" + System.currentTimeMillis(); @@ -307,17 +346,15 @@ public class StorageGroupProcessor { * @return version controller */ private VersionController getVersionControllerByTimePartitionId(long timePartitionId) { - VersionController res = timePartitionIdVersionControllerMap.get(timePartitionId); - if (res == null) { - try { - res = new SimpleFileVersionController(storageGroupSysDir.getPath(), timePartitionId); - timePartitionIdVersionControllerMap.put(timePartitionId, res); - } catch (IOException e) { - logger.error("can't build a version controller for time partition" + timePartitionId); - } - } - - return res; + return timePartitionIdVersionControllerMap.computeIfAbsent(timePartitionId, + id -> { + try { + return new SimpleFileVersionController(storageGroupSysDir.getPath(), timePartitionId); + } catch (IOException e) { + logger.error("can't build a version controller for time partition {}", timePartitionId); + return null; + } + }); } private List<TsFileResource> getAllFiles(List<String> folders) { @@ -328,17 +365,20 @@ public class StorageGroupProcessor { continue; } - for (File timeRangeFileFolder : fileFolder.listFiles()) { - // some TsFileResource may be being persisted when the system crashed, try recovering such - // resources - continueFailedRenames(timeRangeFileFolder, TEMP_SUFFIX); + File[] subFiles = fileFolder.listFiles(); + if (subFiles != null) { + for (File partitionFolder : subFiles) { + // some TsFileResource may be being persisted when the system crashed, try recovering such + // resources + continueFailedRenames(partitionFolder, TEMP_SUFFIX); - // some TsFiles were going to be replaced by the merged files when the system crashed and - // the process was interrupted before the merged files could be named - continueFailedRenames(timeRangeFileFolder, MERGE_SUFFIX); + // some TsFiles were going to be replaced by the merged files when the system crashed and + // the process was interrupted before the merged files could be named + continueFailedRenames(partitionFolder, MERGE_SUFFIX); - Collections.addAll(tsFiles, - fsFactory.listFilesBySuffix(timeRangeFileFolder.getAbsolutePath(), TSFILE_SUFFIX)); + Collections.addAll(tsFiles, + fsFactory.listFilesBySuffix(partitionFolder.getAbsolutePath(), TSFILE_SUFFIX)); + } } } @@ -796,12 +836,12 @@ public class StorageGroupProcessor { * @return file name */ private String getNewTsFileName(long timePartitionId) { - return getNewTsFileName(System.currentTimeMillis(), - getVersionControllerByTimePartitionId(timePartitionId).nextVersion(), 0); + long version = getVersionControllerByTimePartitionId(timePartitionId).nextVersion(); + partitionDirectFileVersions.computeIfAbsent(timePartitionId, p -> new HashSet<>()).add(version); + return getNewTsFileName(System.currentTimeMillis(), version, 0); } private String getNewTsFileName(long time, long version, int mergeCnt) { - allDirectFileVersions.add(version); return time + IoTDBConstant.TSFILE_NAME_SEPARATOR + version + IoTDBConstant.TSFILE_NAME_SEPARATOR + mergeCnt + TSFILE_SUFFIX; } @@ -1185,22 +1225,7 @@ public class StorageGroupProcessor { // time partition to divide storage group long timePartitionId = StorageEngine.fromTimeToTimePartition(timestamp); // write log - if (IoTDBDescriptor.getInstance().getConfig().isEnableWal()) { - DeletePlan deletionPlan = new DeletePlan(timestamp, new Path(deviceId, measurementId)); - for (Map.Entry<Long, TsFileProcessor> entry : workSequenceTsFileProcessors.entrySet()) { - if (entry.getKey() <= timePartitionId) { - entry.getValue().getLogNode() - .write(deletionPlan); - } - } - - for (Map.Entry<Long, TsFileProcessor> entry : workUnsequenceTsFileProcessors.entrySet()) { - if (entry.getKey() <= timePartitionId) { - entry.getValue().getLogNode() - .write(deletionPlan); - } - } - } + logDeletion(timestamp, deviceId, measurementId, timePartitionId); Path fullPath = new Path(deviceId, measurementId); Deletion deletion = new Deletion(fullPath, @@ -1225,6 +1250,24 @@ public class StorageGroupProcessor { } } + private void logDeletion(long timestamp, String deviceId, String measurementId, long timePartitionId) + throws IOException { + if (IoTDBDescriptor.getInstance().getConfig().isEnableWal()) { + DeletePlan deletionPlan = new DeletePlan(timestamp, new Path(deviceId, measurementId)); + for (Map.Entry<Long, TsFileProcessor> entry : workSequenceTsFileProcessors.entrySet()) { + if (entry.getKey() <= timePartitionId) { + entry.getValue().getLogNode().write(deletionPlan); + } + } + + for (Map.Entry<Long, TsFileProcessor> entry : workUnsequenceTsFileProcessors.entrySet()) { + if (entry.getKey() <= timePartitionId) { + entry.getValue().getLogNode().write(deletionPlan); + } + } + } + } + private void deleteDataInFiles(Collection<TsFileResource> tsFileResourceList, Deletion deletion, List<ModificationFile> updatedModFiles) @@ -1521,13 +1564,14 @@ public class StorageGroupProcessor { * @param newTsFileResource tsfile resource * @UsedBy sync module. */ - public void loadNewTsFileForSync(TsFileResource newTsFileResource) - throws TsFileProcessorException { + public void loadNewTsFileForSync(TsFileResource newTsFileResource) throws LoadFileException { File tsfileToBeInserted = newTsFileResource.getFile(); + long newFilePartitionId = getNewFilePartitionId(newTsFileResource); writeLock(); mergeLock.writeLock().lock(); try { - if (loadTsFileByType(LoadTsFileType.LOAD_SEQUENCE, tsfileToBeInserted, newTsFileResource)){ + if (loadTsFileByType(LoadTsFileType.LOAD_SEQUENCE, tsfileToBeInserted, newTsFileResource, + newFilePartitionId)){ updateLatestTimeMap(newTsFileResource); } } catch (DiskSpaceInsufficientException e) { @@ -1535,7 +1579,7 @@ public class StorageGroupProcessor { "Failed to append the tsfile {} to storage group processor {} because the disk space is insufficient.", tsfileToBeInserted.getAbsolutePath(), tsfileToBeInserted.getParentFile().getName()); IoTDBDescriptor.getInstance().getConfig().setReadOnly(true); - throw new TsFileProcessorException(e); + throw new LoadFileException(e); } finally { mergeLock.writeLock().unlock(); writeUnlock(); @@ -1556,82 +1600,51 @@ public class StorageGroupProcessor { * @param newTsFileResource tsfile resource * @UsedBy load external tsfile module */ - public void loadNewTsFile(TsFileResource newTsFileResource) - throws TsFileProcessorException { + public void loadNewTsFile(TsFileResource newTsFileResource) throws LoadFileException { File tsfileToBeInserted = newTsFileResource.getFile(); + long newFilePartitionId = getNewFilePartitionId(newTsFileResource); writeLock(); mergeLock.writeLock().lock(); try { - boolean isOverlap = false; - int preIndex = -1, subsequentIndex = sequenceFileTreeSet.size(); - List<TsFileResource> sequenceList = new ArrayList<>(sequenceFileTreeSet); - // check new tsfile - outer: - for (int i = 0; i < sequenceList.size(); i++) { - if (sequenceList.get(i).getFile().getName().equals(tsfileToBeInserted.getName())) { - return; - } - if (i == sequenceList.size() - 1 && sequenceList.get(i).getEndTimeMap().isEmpty()) { - continue; - } - boolean hasPre = false, hasSubsequence = false; - for (String device : newTsFileResource.getStartTimeMap().keySet()) { - if (sequenceList.get(i).getStartTimeMap().containsKey(device)) { - long startTime1 = sequenceList.get(i).getStartTimeMap().get(device); - long endTime1 = sequenceList.get(i).getEndTimeMap().get(device); - long startTime2 = newTsFileResource.getStartTimeMap().get(device); - long endTime2 = newTsFileResource.getEndTimeMap().get(device); - if (startTime1 > endTime2) { - hasSubsequence = true; - } else if (startTime2 > endTime1) { - hasPre = true; - } else { - isOverlap = true; - break outer; - } - } - } - if (hasPre && hasSubsequence) { - isOverlap = true; - break; - } - if (!hasPre && hasSubsequence) { - subsequentIndex = i; - break; - } - if (hasPre) { - preIndex = i; - } + + int insertPos = findInsertionPosition(newTsFileResource, newFilePartitionId, sequenceList); + if (insertPos == POS_ALREADY_EXIST) { + return; } // loading tsfile by type - if (isOverlap) { - loadTsFileByType(LoadTsFileType.LOAD_UNSEQUENCE, tsfileToBeInserted, newTsFileResource); + if (insertPos == POS_OVERLAP) { + loadTsFileByType(LoadTsFileType.LOAD_UNSEQUENCE, tsfileToBeInserted, newTsFileResource, + newFilePartitionId); } else { // check whether the file name needs to be renamed. - if (subsequentIndex != sequenceFileTreeSet.size() || preIndex != -1) { - String newFileName = getFileNameForLoadingFile(tsfileToBeInserted.getName(), preIndex, - subsequentIndex, getTimePartitionFromTsFileResource(newTsFileResource)); + if (!sequenceFileTreeSet.isEmpty()) { + String newFileName = getFileNameForLoadingFile(tsfileToBeInserted.getName(), insertPos, + getTimePartitionFromTsFileResource(newTsFileResource), sequenceList); if (!newFileName.equals(tsfileToBeInserted.getName())) { logger.info("Tsfile {} must be renamed to {} for loading into the sequence list.", tsfileToBeInserted.getName(), newFileName); newTsFileResource.setFile(new File(tsfileToBeInserted.getParentFile(), newFileName)); } } - loadTsFileByType(LoadTsFileType.LOAD_SEQUENCE, tsfileToBeInserted, newTsFileResource); + loadTsFileByType(LoadTsFileType.LOAD_SEQUENCE, tsfileToBeInserted, newTsFileResource, + newFilePartitionId); } // update latest time map updateLatestTimeMap(newTsFileResource); - allDirectFileVersions.addAll(newTsFileResource.getHistoricalVersions()); + String[] filePathSplit = FilePathUtils.splitTsFilePath(newTsFileResource); + long partitionNum = Long.parseLong(filePathSplit[filePathSplit.length - 2]); + partitionDirectFileVersions.computeIfAbsent(partitionNum, p -> new HashSet<>()) + .addAll(newTsFileResource.getHistoricalVersions()); } catch (DiskSpaceInsufficientException e) { logger.error( "Failed to append the tsfile {} to storage group processor {} because the disk space is insufficient.", tsfileToBeInserted.getAbsolutePath(), tsfileToBeInserted.getParentFile().getName()); IoTDBDescriptor.getInstance().getConfig().setReadOnly(true); - throw new TsFileProcessorException(e); + throw new LoadFileException(e); } finally { mergeLock.writeLock().unlock(); writeUnlock(); @@ -1639,11 +1652,133 @@ public class StorageGroupProcessor { } /** + * Check and get the partition id of a TsFile to be inserted using the start times and end + * times of devices. + * TODO: when the partition violation happens, split the file and load into different partitions + * @throws LoadFileException if the data of the file cross partitions or it is empty + */ + private long getNewFilePartitionId(TsFileResource resource) throws LoadFileException { + long partitionId = -1; + for (Long startTime : resource.getStartTimeMap().values()) { + long p = StorageEngine.fromTimeToTimePartition(startTime); + if (partitionId == -1) { + partitionId = p; + } else { + if (partitionId != p) { + throw new PartitionViolationException(resource); + } + } + } + for (Long endTime : resource.getEndTimeMap().values()) { + long p = StorageEngine.fromTimeToTimePartition(endTime); + if (partitionId == -1) { + partitionId = p; + } else { + if (partitionId != p) { + throw new PartitionViolationException(resource); + } + } + } + if (partitionId == -1) { + throw new LoadEmptyFileException(); + } + return partitionId; + } + + /** + * Find the position of "newTsFileResource" in the sequence files if it can be inserted into them. + * @param newTsFileResource + * @param newFilePartitionId + * @return POS_ALREADY_EXIST(-2) if some file has the same name as the one to be inserted + * POS_OVERLAP(-3) if some file overlaps the new file + * an insertion position i >= -1 if the new file can be inserted between [i, i+1] + */ + private int findInsertionPosition(TsFileResource newTsFileResource, long newFilePartitionId, + List<TsFileResource> sequenceList) { + File tsfileToBeInserted = newTsFileResource.getFile(); + + int insertPos = -1; + + // find the position where the new file should be inserted + for (int i = 0; i < sequenceList.size(); i++) { + TsFileResource localFile = sequenceList.get(i); + if (localFile.getFile().getName().equals(tsfileToBeInserted.getName())) { + return POS_ALREADY_EXIST; + } + long localPartitionId = Long.parseLong(localFile.getFile().getParentFile().getName()); + if (i == sequenceList.size() - 1 && localFile.getEndTimeMap().isEmpty() + || newFilePartitionId > localPartitionId) { + // skip files that are in the previous partition and the last empty file, as the all data + // in those files must be older than the new file + continue; + } + + int fileComparison = compareTsFileDevices(newTsFileResource, localFile); + switch (fileComparison) { + case 0: + // some devices are newer but some devices are older, the two files overlap in general + return POS_OVERLAP; + case -1: + // all devices in localFile are newer than the new file, the new file can be + // inserted before localFile + return i - 1; + default: + // all devices in the local file are older than the new file, proceed to the next file + insertPos = i; + } + } + return insertPos; + } + + /** + * Compare each device in the two files to find the time relation of them. + * @param fileA + * @param fileB + * @return -1 if fileA is totally older than fileB (A < B) + * 0 if fileA is partially older than fileB and partially newer than fileB (A X B) + * 1 if fileA is totally newer than fileB (B < A) + */ + private int compareTsFileDevices(TsFileResource fileA, TsFileResource fileB) { + boolean hasPre = false, hasSubsequence = false; + for (String device : fileA.getStartTimeMap().keySet()) { + if (!fileB.getStartTimeMap().containsKey(device)) { + continue; + } + long startTimeA = fileA.getStartTimeMap().get(device); + long endTimeA = fileA.getEndTimeMap().get(device); + long startTimeB = fileB.getStartTimeMap().get(device); + long endTimeB = fileB.getEndTimeMap().get(device); + if (startTimeA > endTimeB) { + // A's data of the device is later than to the B's data + hasPre = true; + } else if (startTimeB > endTimeA) { + // A's data of the device is previous to the B's data + hasSubsequence = true; + } else { + // the two files overlap in the device + return 0; + } + } + if (hasPre && hasSubsequence) { + // some devices are newer but some devices are older, the two files overlap in general + return 0; + } + if (!hasPre && hasSubsequence) { + // all devices in B are newer than those in A + return -1; + } + // all devices in B are older than those in A + return 1; + } + + /** * If the historical versions of a file is a sub-set of the given file's, remove it to reduce * unnecessary merge. Only used when the file sender and the receiver share the same file * close policy. + * Warning: DO NOT REMOVE * @param resource */ + @SuppressWarnings("unused") public void removeFullyOverlapFiles(TsFileResource resource) { writeLock(); closeQueryLock.writeLock().lock(); @@ -1702,24 +1837,25 @@ public class StorageGroupProcessor { * version number is the version number in the tsfile with a larger timestamp. * * @param tsfileName origin tsfile name + * @param insertIndex the new file will be inserted between the files [insertIndex, insertIndex + * + 1] * @return appropriate filename */ - private String getFileNameForLoadingFile(String tsfileName, int preIndex, int subsequentIndex, - long timePartitionId) { + private String getFileNameForLoadingFile(String tsfileName, int insertIndex, + long timePartitionId, List<TsFileResource> sequenceList) { long currentTsFileTime = Long .parseLong(tsfileName.split(IoTDBConstant.TSFILE_NAME_SEPARATOR)[0]); long preTime; - List<TsFileResource> sequenceList = new ArrayList<>(sequenceFileTreeSet); - if (preIndex == -1) { + if (insertIndex == -1) { preTime = 0L; } else { - String preName = sequenceList.get(preIndex).getFile().getName(); + String preName = sequenceList.get(insertIndex).getFile().getName(); preTime = Long.parseLong(preName.split(IoTDBConstant.TSFILE_NAME_SEPARATOR)[0]); } - if (subsequentIndex == sequenceFileTreeSet.size()) { + if (insertIndex == sequenceFileTreeSet.size() - 1) { return preTime < currentTsFileTime ? tsfileName : getNewTsFileName(timePartitionId); } else { - String subsequenceName = sequenceList.get(subsequentIndex).getFile().getName(); + String subsequenceName = sequenceList.get(insertIndex + 1).getFile().getName(); long subsequenceTime = Long .parseLong(subsequenceName.split(IoTDBConstant.TSFILE_NAME_SEPARATOR)[0]); long subsequenceVersion = Long @@ -1767,19 +1903,18 @@ public class StorageGroupProcessor { * * @param type load type * @param tsFileResource tsfile resource to be loaded + * @param filePartitionId the partition id of the new file * @UsedBy sync module, load external tsfile module. * @return load the file successfully */ private boolean loadTsFileByType(LoadTsFileType type, File syncedTsFile, - TsFileResource tsFileResource) - throws TsFileProcessorException, DiskSpaceInsufficientException { + TsFileResource tsFileResource, long filePartitionId) + throws LoadFileException, DiskSpaceInsufficientException { File targetFile; - long timeRangeId = StorageEngine.fromTimeToTimePartition( - tsFileResource.getStartTimeMap().entrySet().iterator().next().getValue()); switch (type) { case LOAD_UNSEQUENCE: targetFile = new File(DirectoryManager.getInstance().getNextFolderForUnSequenceFile(), - storageGroupName + File.separatorChar + timeRangeId + File.separator + tsFileResource + storageGroupName + File.separatorChar + filePartitionId + File.separator + tsFileResource .getFile().getName()); tsFileResource.setFile(targetFile); if(unSequenceFileList.contains(tsFileResource)){ @@ -1793,7 +1928,7 @@ public class StorageGroupProcessor { case LOAD_SEQUENCE: targetFile = new File(DirectoryManager.getInstance().getNextFolderForSequenceFile(), - storageGroupName + File.separatorChar + timeRangeId + File.separator + storageGroupName + File.separatorChar + filePartitionId + File.separator + tsFileResource.getFile().getName()); tsFileResource.setFile(targetFile); if(sequenceFileTreeSet.contains(tsFileResource)){ @@ -1805,7 +1940,7 @@ public class StorageGroupProcessor { syncedTsFile.getAbsolutePath(), targetFile.getAbsolutePath()); break; default: - throw new TsFileProcessorException( + throw new LoadFileException( String.format("Unsupported type of loading tsfile : %s", type)); } @@ -1818,7 +1953,7 @@ public class StorageGroupProcessor { } catch (IOException e) { logger.error("File renaming failed when loading tsfile. Origin: {}, Target: {}", syncedTsFile.getAbsolutePath(), targetFile.getAbsolutePath(), e); - throw new TsFileProcessorException(String.format( + throw new LoadFileException(String.format( "File renaming failed when loading tsfile. Origin: %s, Target: %s, because %s", syncedTsFile.getAbsolutePath(), targetFile.getAbsolutePath(), e.getMessage())); } @@ -1832,11 +1967,13 @@ public class StorageGroupProcessor { } catch (IOException e) { logger.error("File renaming failed when loading .resource file. Origin: {}, Target: {}", syncedResourceFile.getAbsolutePath(), targetResourceFile.getAbsolutePath(), e); - throw new TsFileProcessorException(String.format( + throw new LoadFileException(String.format( "File renaming failed when loading .resource file. Origin: %s, Target: %s, because %s", syncedResourceFile.getAbsolutePath(), targetResourceFile.getAbsolutePath(), e.getMessage())); } + partitionDirectFileVersions.computeIfAbsent(filePartitionId, + p -> new HashSet<>()).addAll(tsFileResource.getHistoricalVersions()); return true; } @@ -1985,8 +2122,9 @@ public class StorageGroupProcessor { return storageGroupName; } - public boolean isFileAlreadyExist(TsFileResource tsFileResource) { - return allDirectFileVersions.containsAll(tsFileResource.getHistoricalVersions()); + public boolean isFileAlreadyExist(TsFileResource tsFileResource, long partitionNum) { + return partitionDirectFileVersions.getOrDefault(partitionNum, Collections.emptySet()) + .containsAll(tsFileResource.getHistoricalVersions()); } @FunctionalInterface diff --git a/server/src/main/java/org/apache/iotdb/db/exception/LoadEmptyFileException.java b/server/src/main/java/org/apache/iotdb/db/exception/LoadEmptyFileException.java new file mode 100644 index 0000000..9ba22e2 --- /dev/null +++ b/server/src/main/java/org/apache/iotdb/db/exception/LoadEmptyFileException.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + + +package org.apache.iotdb.db.exception; + +public class LoadEmptyFileException extends LoadFileException { + + public LoadEmptyFileException() { + super("Cannot load an empty file"); + } + +} diff --git a/server/src/main/java/org/apache/iotdb/db/exception/LoadFileException.java b/server/src/main/java/org/apache/iotdb/db/exception/LoadFileException.java new file mode 100644 index 0000000..3af898d --- /dev/null +++ b/server/src/main/java/org/apache/iotdb/db/exception/LoadFileException.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.exception; + +import org.apache.iotdb.rpc.TSStatusCode; + +public class LoadFileException extends IoTDBException { + + public LoadFileException(String message) { + super(message, TSStatusCode.LOAD_FILE_ERROR.getStatusCode()); + } + + public LoadFileException(Exception exception) { + super(exception, TSStatusCode.LOAD_FILE_ERROR.getStatusCode()); + } +} diff --git a/server/src/main/java/org/apache/iotdb/db/exception/PartitionViolationException.java b/server/src/main/java/org/apache/iotdb/db/exception/PartitionViolationException.java new file mode 100644 index 0000000..c794b61 --- /dev/null +++ b/server/src/main/java/org/apache/iotdb/db/exception/PartitionViolationException.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.exception; + +import org.apache.iotdb.db.engine.storagegroup.TsFileResource; + +public class PartitionViolationException extends LoadFileException{ + + public PartitionViolationException(TsFileResource resource) { + super(String.format("The data of file %s cross partitions", resource)); + } +} diff --git a/server/src/main/java/org/apache/iotdb/db/sync/receiver/load/FileLoader.java b/server/src/main/java/org/apache/iotdb/db/sync/receiver/load/FileLoader.java index b7cb388..c6a07be 100644 --- a/server/src/main/java/org/apache/iotdb/db/sync/receiver/load/FileLoader.java +++ b/server/src/main/java/org/apache/iotdb/db/sync/receiver/load/FileLoader.java @@ -26,9 +26,9 @@ import java.util.concurrent.TimeUnit; import org.apache.commons.io.FileUtils; import org.apache.iotdb.db.engine.StorageEngine; import org.apache.iotdb.db.engine.storagegroup.TsFileResource; +import org.apache.iotdb.db.exception.LoadFileException; import org.apache.iotdb.db.exception.StorageEngineException; import org.apache.iotdb.db.exception.SyncDeviceOwnerConflictException; -import org.apache.iotdb.db.exception.TsFileProcessorException; import org.apache.iotdb.db.sync.conf.SyncConstant; import org.apache.iotdb.db.utils.FileLoaderUtils; import org.slf4j.Logger; @@ -139,7 +139,7 @@ public class FileLoader implements IFileLoader { StorageEngine.getInstance().loadNewTsFileForSync(tsFileResource); } catch (SyncDeviceOwnerConflictException e) { LOGGER.error("Device owner has conflicts, so skip the loading file", e); - } catch (TsFileProcessorException | StorageEngineException e) { + } catch (LoadFileException | StorageEngineException e) { LOGGER.error("Can not load new tsfile {}", newTsFile.getAbsolutePath(), e); throw new IOException(e); } diff --git a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBLoadExternalTsfileTest.java b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBLoadExternalTsfileTest.java index 1f377c5..4b01523 100644 --- a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBLoadExternalTsfileTest.java +++ b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBLoadExternalTsfileTest.java @@ -186,7 +186,7 @@ public class IoTDBLoadExternalTsfileTest { .getSequenceFileTreeSet()); File tmpDir = new File( resources.get(0).getFile().getParentFile().getParentFile().getParentFile(), - "tmp" + File.separator + "root.vehicle"); + "tmp" + File.separator + "root.vehicle" + File.separator + "0"); if (!tmpDir.exists()) { tmpDir.mkdirs(); } @@ -199,7 +199,7 @@ public class IoTDBLoadExternalTsfileTest { StorageEngine.getInstance().getProcessor("root.test") .getSequenceFileTreeSet()); tmpDir = new File(resources.get(0).getFile().getParentFile().getParentFile().getParentFile(), - "tmp" + File.separator + "root.test"); + "tmp" + File.separator + "root.test" + File.separator + "0"); if (!tmpDir.exists()) { tmpDir.mkdirs(); } @@ -220,8 +220,8 @@ public class IoTDBLoadExternalTsfileTest { .getSequenceFileTreeSet()); assertEquals(2, resources.size()); assertNotNull(tmpDir.listFiles()); - assertEquals(0, new File(tmpDir, "root.vehicle").listFiles().length); - assertEquals(0, new File(tmpDir, "root.test").listFiles().length); + assertEquals(0, new File(tmpDir, "root.vehicle" + File.separator + "0").listFiles().length); + assertEquals(0, new File(tmpDir, "root.test" + File.separator + "0").listFiles().length); } catch (StorageEngineException e) { Assert.fail(); } @@ -289,7 +289,7 @@ public class IoTDBLoadExternalTsfileTest { assertEquals(2, resources.size()); File tmpDir = new File( resources.get(0).getFile().getParentFile().getParentFile().getParentFile(), - "tmp" + File.separator + "root.vehicle"); + "tmp" + File.separator + "root.vehicle" + File.separator + "0"); if (!tmpDir.exists()) { tmpDir.mkdirs(); } @@ -307,7 +307,7 @@ public class IoTDBLoadExternalTsfileTest { resources = new ArrayList<>( StorageEngine.getInstance().getProcessor("root.test").getSequenceFileTreeSet()); assertEquals(2, resources.size()); - tmpDir = new File(tmpDir.getParent(), "root.test"); + tmpDir = new File(tmpDir.getParentFile().getParentFile(), "root.test" + File.separator + "0"); if (!tmpDir.exists()) { tmpDir.mkdirs(); } @@ -322,7 +322,7 @@ public class IoTDBLoadExternalTsfileTest { } // load all tsfile in tmp dir - tmpDir = tmpDir.getParentFile(); + tmpDir = tmpDir.getParentFile().getParentFile(); statement.execute(String.format("load %s", tmpDir.getAbsolutePath())); assertEquals(2, StorageEngine.getInstance().getProcessor("root.vehicle") .getSequenceFileTreeSet().size()); @@ -333,8 +333,8 @@ public class IoTDBLoadExternalTsfileTest { assertEquals(3, StorageEngine.getInstance().getProcessor("root.test") .getSequenceFileTreeSet().size()); assertNotNull(tmpDir.listFiles()); - assertEquals(0, new File(tmpDir, "root.vehicle").listFiles().length); - assertEquals(0, new File(tmpDir, "root.test").listFiles().length); + assertEquals(0, new File(tmpDir, "root.vehicle" + File.separator + "0").listFiles().length); + assertEquals(0, new File(tmpDir, "root.test" + File.separator + "0").listFiles().length); // check query result hasResultSet = statement.execute("SELECT * FROM root"); @@ -370,7 +370,7 @@ public class IoTDBLoadExternalTsfileTest { File tmpDir = new File( resources.get(0).getFile().getParentFile().getParentFile().getParentFile(), - "tmp" + File.separator + "root.vehicle"); + "tmp" + File.separator + "root.vehicle" + File.separator + "0"); if (!tmpDir.exists()) { tmpDir.mkdirs(); } @@ -383,7 +383,7 @@ public class IoTDBLoadExternalTsfileTest { StorageEngine.getInstance().getProcessor("root.test") .getSequenceFileTreeSet()); tmpDir = new File(resources.get(0).getFile().getParentFile().getParentFile().getParentFile(), - "tmp" + File.separator + "root.test"); + "tmp" + File.separator + "root.test" + File.separator + "0"); if (!tmpDir.exists()) { tmpDir.mkdirs(); } @@ -432,7 +432,7 @@ public class IoTDBLoadExternalTsfileTest { Assert.assertTrue(hasError); // test load metadata automatically, it will succeed. - tmpDir = tmpDir.getParentFile(); + tmpDir = tmpDir.getParentFile().getParentFile(); statement.execute(String.format("load %s true 2", tmpDir.getAbsolutePath())); resources = new ArrayList<>( StorageEngine.getInstance().getProcessor("root.vehicle") @@ -444,9 +444,10 @@ public class IoTDBLoadExternalTsfileTest { assertEquals(2, resources.size()); assertEquals(2, tmpDir.listFiles().length); for (File dir : tmpDir.listFiles()) { - assertEquals(0, dir.listFiles().length); + assertEquals(0, dir.listFiles()[0].listFiles().length); } } catch (StorageEngineException e) { + e.printStackTrace(); Assert.fail(); } } diff --git a/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java b/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java index 74710cd..b22c431 100644 --- a/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java +++ b/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java @@ -39,6 +39,7 @@ public enum TSStatusCode { STORAGE_ENGINE_ERROR(313), TSFILE_PROCESSOR_ERROR(314), PATH_ILLEGAL(315), + LOAD_FILE_ERROR(316), EXECUTE_STATEMENT_ERROR(400), SQL_PARSE_ERROR(401),