This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new f6f216e Integrate data file version recording with time partitioning
(#935)
f6f216e is described below
commit f6f216e888459dedf392cbe1d7754f097e11db91
Author: Jiang Tian <[email protected]>
AuthorDate: Wed Mar 25 03:35:21 2020 -0500
Integrate data file version recording with time partitioning (#935)
* integrate data partition with file version management
---
.../org/apache/iotdb/db/engine/StorageEngine.java | 12 +-
.../engine/storagegroup/StorageGroupProcessor.java | 394 ++++++++++++++-------
.../iotdb/db/exception/LoadEmptyFileException.java | 29 ++
.../iotdb/db/exception/LoadFileException.java | 33 ++
.../db/exception/PartitionViolationException.java | 29 ++
.../iotdb/db/sync/receiver/load/FileLoader.java | 4 +-
.../integration/IoTDBLoadExternalTsfileTest.java | 27 +-
.../java/org/apache/iotdb/rpc/TSStatusCode.java | 1 +
8 files changed, 380 insertions(+), 149 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
index c63f728..c2da578 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
@@ -48,8 +48,8 @@ import
org.apache.iotdb.db.engine.querycontext.QueryDataSource;
import org.apache.iotdb.db.engine.storagegroup.StorageGroupProcessor;
import org.apache.iotdb.db.engine.storagegroup.TsFileProcessor;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.db.exception.LoadFileException;
import org.apache.iotdb.db.exception.StorageEngineException;
-import org.apache.iotdb.db.exception.TsFileProcessorException;
import org.apache.iotdb.db.exception.metadata.MetadataException;
import org.apache.iotdb.db.exception.metadata.StorageGroupNotSetException;
import org.apache.iotdb.db.exception.runtime.StorageEngineFailureException;
@@ -481,13 +481,13 @@ public class StorageEngine implements IService {
}
public void loadNewTsFileForSync(TsFileResource newTsFileResource)
- throws TsFileProcessorException, StorageEngineException {
+ throws StorageEngineException, LoadFileException {
getProcessor(newTsFileResource.getFile().getParentFile().getName())
.loadNewTsFileForSync(newTsFileResource);
}
public void loadNewTsFile(TsFileResource newTsFileResource)
- throws TsFileProcessorException, StorageEngineException,
MetadataException {
+ throws LoadFileException, StorageEngineException, MetadataException {
Map<String, Long> startTimeMap = newTsFileResource.getStartTimeMap();
if (startTimeMap == null || startTimeMap.isEmpty()) {
throw new StorageEngineException("Can not get the corresponding storage
group.");
@@ -549,10 +549,10 @@ public class StorageEngine implements IService {
this.fileFlushPolicy = fileFlushPolicy;
}
- public boolean isFileAlreadyExist(TsFileResource tsFileResource, String
storageGroup) {
- // TODO-Cluster#350: integrate with time partitioning
+ public boolean isFileAlreadyExist(TsFileResource tsFileResource, String
storageGroup,
+ long partitionNum) {
StorageGroupProcessor processor = processorMap.get(storageGroup);
- return processor != null && processor.isFileAlreadyExist(tsFileResource);
+ return processor != null && processor.isFileAlreadyExist(tsFileResource,
partitionNum);
}
public static long getTimePartitionInterval() {
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index 295a074..d6b2c98 100755
---
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -18,6 +18,28 @@
*/
package org.apache.iotdb.db.engine.storagegroup;
+import static org.apache.iotdb.db.engine.merge.task.MergeTask.MERGE_SUFFIX;
+import static
org.apache.iotdb.db.engine.storagegroup.TsFileResource.TEMP_SUFFIX;
+import static
org.apache.iotdb.tsfile.common.constant.TsFileConstant.TSFILE_SUFFIX;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.TreeSet;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.commons.io.FileUtils;
import org.apache.iotdb.db.conf.IoTDBConstant;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
@@ -40,7 +62,14 @@ import
org.apache.iotdb.db.engine.querycontext.QueryDataSource;
import org.apache.iotdb.db.engine.querycontext.ReadOnlyMemChunk;
import org.apache.iotdb.db.engine.version.SimpleFileVersionController;
import org.apache.iotdb.db.engine.version.VersionController;
-import org.apache.iotdb.db.exception.*;
+import org.apache.iotdb.db.exception.DiskSpaceInsufficientException;
+import org.apache.iotdb.db.exception.LoadEmptyFileException;
+import org.apache.iotdb.db.exception.LoadFileException;
+import org.apache.iotdb.db.exception.MergeException;
+import org.apache.iotdb.db.exception.PartitionViolationException;
+import org.apache.iotdb.db.exception.StorageGroupProcessorException;
+import org.apache.iotdb.db.exception.TsFileProcessorException;
+import org.apache.iotdb.db.exception.WriteProcessException;
import org.apache.iotdb.db.exception.metadata.MetadataException;
import org.apache.iotdb.db.exception.query.OutOfTTLException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
@@ -53,6 +82,7 @@ import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.db.query.control.QueryFileManager;
import org.apache.iotdb.db.utils.CopyOnReadLinkedList;
+import org.apache.iotdb.db.utils.FilePathUtils;
import org.apache.iotdb.db.utils.UpgradeUtils;
import org.apache.iotdb.db.writelog.recover.TsFileRecoverPerformer;
import org.apache.iotdb.rpc.RpcUtils;
@@ -73,17 +103,6 @@ import
org.apache.iotdb.tsfile.write.writer.RestorableTsFileIOWriter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.File;
-import java.io.IOException;
-import java.util.*;
-import java.util.Map.Entry;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
-import static org.apache.iotdb.db.engine.merge.task.MergeTask.MERGE_SUFFIX;
-import static
org.apache.iotdb.db.engine.storagegroup.TsFileResource.TEMP_SUFFIX;
-import static
org.apache.iotdb.tsfile.common.constant.TsFileConstant.TSFILE_SUFFIX;
-
/**
* For sequence data, a StorageGroupProcessor has some TsFileProcessors, in
which there is only one
@@ -110,6 +129,15 @@ public class StorageGroupProcessor {
private static final String MERGING_MODIFICATION_FILE_NAME = "merge.mods";
private static final Logger logger =
LoggerFactory.getLogger(StorageGroupProcessor.class);
private static final int MAX_CACHE_SENSORS = 5000;
+
+ /**
+ * indicating the file to be loaded already exists locally.
+ */
+ private static final int POS_ALREADY_EXIST = -2;
+ /**
+ * indicating the file to be loaded overlap with some files.
+ */
+ private static final int POS_OVERLAP = -3;
/**
* a read write lock for guaranteeing concurrent safety when accessing all
fields in this class
* (i.e., schema, (un)sequenceFileList, work(un)SequenceTsFileProcessor,
@@ -203,9 +231,16 @@ public class StorageGroupProcessor {
private FSFactory fsFactory = FSFactoryProducer.getFSFactory();
private TsFileFlushPolicy fileFlushPolicy;
- // allDirectFileVersions records the versions of the direct TsFiles
(generated by flush), not
- // including the files generated by merge
- private Set<Long> allDirectFileVersions = new HashSet<>();
+ /**
+ * partitionDirectFileVersions records the versions of the direct TsFiles
(generated by close,
+ * not including the files generated by merge) of each partition.
+ * As data file close is managed by the leader in the distributed version,
the files with the
+ * same version(s) have the same data, despite that the inner structure (the
size and
+ * organization of chunks) may be different, so we can easily find what
remote files we do not
+ * have locally.
+ * partition number -> version number set
+ */
+ private Map<Long, Set<Long>> partitionDirectFileVersions = new HashMap<>();
public StorageGroupProcessor(String systemInfoDir, String storageGroupName,
TsFileFlushPolicy fileFlushPolicy)
@@ -246,14 +281,18 @@ public class StorageGroupProcessor {
if (resource.getFile().length() == 0) {
deleteTsfile(resource.getFile());
}
- allDirectFileVersions.addAll(resource.getHistoricalVersions());
+ String[] filePathSplit = FilePathUtils.splitTsFilePath(resource);
+ long partitionNum = Long.parseLong(filePathSplit[filePathSplit.length
- 2]);
+ partitionDirectFileVersions.computeIfAbsent(partitionNum, p -> new
HashSet<>()).addAll(resource.getHistoricalVersions());
}
for (TsFileResource resource : unseqTsFiles) {
//After recover, case the TsFile's length is equal to 0, delete both
the TsFileResource and the file itself
if (resource.getFile().length() == 0) {
deleteTsfile(resource.getFile());
}
- allDirectFileVersions.addAll(resource.getHistoricalVersions());
+ String[] filePathSplit = FilePathUtils.splitTsFilePath(resource);
+ long partitionNum = Long.parseLong(filePathSplit[filePathSplit.length
- 2]);
+ partitionDirectFileVersions.computeIfAbsent(partitionNum, p -> new
HashSet<>()).addAll(resource.getHistoricalVersions());
}
String taskName = storageGroupName + "-" + System.currentTimeMillis();
@@ -307,17 +346,15 @@ public class StorageGroupProcessor {
* @return version controller
*/
private VersionController getVersionControllerByTimePartitionId(long
timePartitionId) {
- VersionController res =
timePartitionIdVersionControllerMap.get(timePartitionId);
- if (res == null) {
- try {
- res = new SimpleFileVersionController(storageGroupSysDir.getPath(),
timePartitionId);
- timePartitionIdVersionControllerMap.put(timePartitionId, res);
- } catch (IOException e) {
- logger.error("can't build a version controller for time partition" +
timePartitionId);
- }
- }
-
- return res;
+ return timePartitionIdVersionControllerMap.computeIfAbsent(timePartitionId,
+ id -> {
+ try {
+ return new
SimpleFileVersionController(storageGroupSysDir.getPath(), timePartitionId);
+ } catch (IOException e) {
+ logger.error("can't build a version controller for time partition
{}", timePartitionId);
+ return null;
+ }
+ });
}
private List<TsFileResource> getAllFiles(List<String> folders) {
@@ -328,17 +365,20 @@ public class StorageGroupProcessor {
continue;
}
- for (File timeRangeFileFolder : fileFolder.listFiles()) {
- // some TsFileResource may be being persisted when the system crashed,
try recovering such
- // resources
- continueFailedRenames(timeRangeFileFolder, TEMP_SUFFIX);
+ File[] subFiles = fileFolder.listFiles();
+ if (subFiles != null) {
+ for (File partitionFolder : subFiles) {
+ // some TsFileResource may be being persisted when the system
crashed, try recovering such
+ // resources
+ continueFailedRenames(partitionFolder, TEMP_SUFFIX);
- // some TsFiles were going to be replaced by the merged files when the
system crashed and
- // the process was interrupted before the merged files could be named
- continueFailedRenames(timeRangeFileFolder, MERGE_SUFFIX);
+ // some TsFiles were going to be replaced by the merged files when
the system crashed and
+ // the process was interrupted before the merged files could be named
+ continueFailedRenames(partitionFolder, MERGE_SUFFIX);
- Collections.addAll(tsFiles,
- fsFactory.listFilesBySuffix(timeRangeFileFolder.getAbsolutePath(),
TSFILE_SUFFIX));
+ Collections.addAll(tsFiles,
+ fsFactory.listFilesBySuffix(partitionFolder.getAbsolutePath(),
TSFILE_SUFFIX));
+ }
}
}
@@ -796,12 +836,12 @@ public class StorageGroupProcessor {
* @return file name
*/
private String getNewTsFileName(long timePartitionId) {
- return getNewTsFileName(System.currentTimeMillis(),
- getVersionControllerByTimePartitionId(timePartitionId).nextVersion(),
0);
+ long version =
getVersionControllerByTimePartitionId(timePartitionId).nextVersion();
+ partitionDirectFileVersions.computeIfAbsent(timePartitionId, p -> new
HashSet<>()).add(version);
+ return getNewTsFileName(System.currentTimeMillis(), version, 0);
}
private String getNewTsFileName(long time, long version, int mergeCnt) {
- allDirectFileVersions.add(version);
return time + IoTDBConstant.TSFILE_NAME_SEPARATOR + version
+ IoTDBConstant.TSFILE_NAME_SEPARATOR + mergeCnt + TSFILE_SUFFIX;
}
@@ -1185,22 +1225,7 @@ public class StorageGroupProcessor {
// time partition to divide storage group
long timePartitionId = StorageEngine.fromTimeToTimePartition(timestamp);
// write log
- if (IoTDBDescriptor.getInstance().getConfig().isEnableWal()) {
- DeletePlan deletionPlan = new DeletePlan(timestamp, new Path(deviceId,
measurementId));
- for (Map.Entry<Long, TsFileProcessor> entry :
workSequenceTsFileProcessors.entrySet()) {
- if (entry.getKey() <= timePartitionId) {
- entry.getValue().getLogNode()
- .write(deletionPlan);
- }
- }
-
- for (Map.Entry<Long, TsFileProcessor> entry :
workUnsequenceTsFileProcessors.entrySet()) {
- if (entry.getKey() <= timePartitionId) {
- entry.getValue().getLogNode()
- .write(deletionPlan);
- }
- }
- }
+ logDeletion(timestamp, deviceId, measurementId, timePartitionId);
Path fullPath = new Path(deviceId, measurementId);
Deletion deletion = new Deletion(fullPath,
@@ -1225,6 +1250,24 @@ public class StorageGroupProcessor {
}
}
+ private void logDeletion(long timestamp, String deviceId, String
measurementId, long timePartitionId)
+ throws IOException {
+ if (IoTDBDescriptor.getInstance().getConfig().isEnableWal()) {
+ DeletePlan deletionPlan = new DeletePlan(timestamp, new Path(deviceId,
measurementId));
+ for (Map.Entry<Long, TsFileProcessor> entry :
workSequenceTsFileProcessors.entrySet()) {
+ if (entry.getKey() <= timePartitionId) {
+ entry.getValue().getLogNode().write(deletionPlan);
+ }
+ }
+
+ for (Map.Entry<Long, TsFileProcessor> entry :
workUnsequenceTsFileProcessors.entrySet()) {
+ if (entry.getKey() <= timePartitionId) {
+ entry.getValue().getLogNode().write(deletionPlan);
+ }
+ }
+ }
+ }
+
private void deleteDataInFiles(Collection<TsFileResource>
tsFileResourceList, Deletion deletion,
List<ModificationFile> updatedModFiles)
@@ -1521,13 +1564,14 @@ public class StorageGroupProcessor {
* @param newTsFileResource tsfile resource
* @UsedBy sync module.
*/
- public void loadNewTsFileForSync(TsFileResource newTsFileResource)
- throws TsFileProcessorException {
+ public void loadNewTsFileForSync(TsFileResource newTsFileResource) throws
LoadFileException {
File tsfileToBeInserted = newTsFileResource.getFile();
+ long newFilePartitionId = getNewFilePartitionId(newTsFileResource);
writeLock();
mergeLock.writeLock().lock();
try {
- if (loadTsFileByType(LoadTsFileType.LOAD_SEQUENCE, tsfileToBeInserted,
newTsFileResource)){
+ if (loadTsFileByType(LoadTsFileType.LOAD_SEQUENCE, tsfileToBeInserted,
newTsFileResource,
+ newFilePartitionId)){
updateLatestTimeMap(newTsFileResource);
}
} catch (DiskSpaceInsufficientException e) {
@@ -1535,7 +1579,7 @@ public class StorageGroupProcessor {
"Failed to append the tsfile {} to storage group processor {}
because the disk space is insufficient.",
tsfileToBeInserted.getAbsolutePath(),
tsfileToBeInserted.getParentFile().getName());
IoTDBDescriptor.getInstance().getConfig().setReadOnly(true);
- throw new TsFileProcessorException(e);
+ throw new LoadFileException(e);
} finally {
mergeLock.writeLock().unlock();
writeUnlock();
@@ -1556,82 +1600,51 @@ public class StorageGroupProcessor {
* @param newTsFileResource tsfile resource
* @UsedBy load external tsfile module
*/
- public void loadNewTsFile(TsFileResource newTsFileResource)
- throws TsFileProcessorException {
+ public void loadNewTsFile(TsFileResource newTsFileResource) throws
LoadFileException {
File tsfileToBeInserted = newTsFileResource.getFile();
+ long newFilePartitionId = getNewFilePartitionId(newTsFileResource);
writeLock();
mergeLock.writeLock().lock();
try {
- boolean isOverlap = false;
- int preIndex = -1, subsequentIndex = sequenceFileTreeSet.size();
-
List<TsFileResource> sequenceList = new ArrayList<>(sequenceFileTreeSet);
- // check new tsfile
- outer:
- for (int i = 0; i < sequenceList.size(); i++) {
- if
(sequenceList.get(i).getFile().getName().equals(tsfileToBeInserted.getName())) {
- return;
- }
- if (i == sequenceList.size() - 1 &&
sequenceList.get(i).getEndTimeMap().isEmpty()) {
- continue;
- }
- boolean hasPre = false, hasSubsequence = false;
- for (String device : newTsFileResource.getStartTimeMap().keySet()) {
- if (sequenceList.get(i).getStartTimeMap().containsKey(device)) {
- long startTime1 =
sequenceList.get(i).getStartTimeMap().get(device);
- long endTime1 = sequenceList.get(i).getEndTimeMap().get(device);
- long startTime2 = newTsFileResource.getStartTimeMap().get(device);
- long endTime2 = newTsFileResource.getEndTimeMap().get(device);
- if (startTime1 > endTime2) {
- hasSubsequence = true;
- } else if (startTime2 > endTime1) {
- hasPre = true;
- } else {
- isOverlap = true;
- break outer;
- }
- }
- }
- if (hasPre && hasSubsequence) {
- isOverlap = true;
- break;
- }
- if (!hasPre && hasSubsequence) {
- subsequentIndex = i;
- break;
- }
- if (hasPre) {
- preIndex = i;
- }
+
+ int insertPos = findInsertionPosition(newTsFileResource,
newFilePartitionId, sequenceList);
+ if (insertPos == POS_ALREADY_EXIST) {
+ return;
}
// loading tsfile by type
- if (isOverlap) {
- loadTsFileByType(LoadTsFileType.LOAD_UNSEQUENCE, tsfileToBeInserted,
newTsFileResource);
+ if (insertPos == POS_OVERLAP) {
+ loadTsFileByType(LoadTsFileType.LOAD_UNSEQUENCE, tsfileToBeInserted,
newTsFileResource,
+ newFilePartitionId);
} else {
// check whether the file name needs to be renamed.
- if (subsequentIndex != sequenceFileTreeSet.size() || preIndex != -1) {
- String newFileName =
getFileNameForLoadingFile(tsfileToBeInserted.getName(), preIndex,
- subsequentIndex,
getTimePartitionFromTsFileResource(newTsFileResource));
+ if (!sequenceFileTreeSet.isEmpty()) {
+ String newFileName =
getFileNameForLoadingFile(tsfileToBeInserted.getName(), insertPos,
+ getTimePartitionFromTsFileResource(newTsFileResource),
sequenceList);
if (!newFileName.equals(tsfileToBeInserted.getName())) {
logger.info("Tsfile {} must be renamed to {} for loading into the
sequence list.",
tsfileToBeInserted.getName(), newFileName);
newTsFileResource.setFile(new
File(tsfileToBeInserted.getParentFile(), newFileName));
}
}
- loadTsFileByType(LoadTsFileType.LOAD_SEQUENCE, tsfileToBeInserted,
newTsFileResource);
+ loadTsFileByType(LoadTsFileType.LOAD_SEQUENCE, tsfileToBeInserted,
newTsFileResource,
+ newFilePartitionId);
}
// update latest time map
updateLatestTimeMap(newTsFileResource);
- allDirectFileVersions.addAll(newTsFileResource.getHistoricalVersions());
+ String[] filePathSplit =
FilePathUtils.splitTsFilePath(newTsFileResource);
+ long partitionNum = Long.parseLong(filePathSplit[filePathSplit.length -
2]);
+ partitionDirectFileVersions.computeIfAbsent(partitionNum, p -> new
HashSet<>())
+ .addAll(newTsFileResource.getHistoricalVersions());
} catch (DiskSpaceInsufficientException e) {
logger.error(
"Failed to append the tsfile {} to storage group processor {}
because the disk space is insufficient.",
tsfileToBeInserted.getAbsolutePath(),
tsfileToBeInserted.getParentFile().getName());
IoTDBDescriptor.getInstance().getConfig().setReadOnly(true);
- throw new TsFileProcessorException(e);
+ throw new LoadFileException(e);
} finally {
mergeLock.writeLock().unlock();
writeUnlock();
@@ -1639,11 +1652,133 @@ public class StorageGroupProcessor {
}
/**
+ * Check and get the partition id of a TsFile to be inserted using the start
times and end
+ * times of devices.
+ * TODO: when the partition violation happens, split the file and load into
different partitions
+ * @throws LoadFileException if the data of the file cross partitions or it
is empty
+ */
+ private long getNewFilePartitionId(TsFileResource resource) throws
LoadFileException {
+ long partitionId = -1;
+ for (Long startTime : resource.getStartTimeMap().values()) {
+ long p = StorageEngine.fromTimeToTimePartition(startTime);
+ if (partitionId == -1) {
+ partitionId = p;
+ } else {
+ if (partitionId != p) {
+ throw new PartitionViolationException(resource);
+ }
+ }
+ }
+ for (Long endTime : resource.getEndTimeMap().values()) {
+ long p = StorageEngine.fromTimeToTimePartition(endTime);
+ if (partitionId == -1) {
+ partitionId = p;
+ } else {
+ if (partitionId != p) {
+ throw new PartitionViolationException(resource);
+ }
+ }
+ }
+ if (partitionId == -1) {
+ throw new LoadEmptyFileException();
+ }
+ return partitionId;
+ }
+
+ /**
+ * Find the position of "newTsFileResource" in the sequence files if it can
be inserted into them.
+ * @param newTsFileResource
+ * @param newFilePartitionId
+ * @return POS_ALREADY_EXIST(-2) if some file has the same name as the one
to be inserted
+ * POS_OVERLAP(-3) if some file overlaps the new file
+ * an insertion position i >= -1 if the new file can be inserted
between [i, i+1]
+ */
+ private int findInsertionPosition(TsFileResource newTsFileResource, long
newFilePartitionId,
+ List<TsFileResource> sequenceList) {
+ File tsfileToBeInserted = newTsFileResource.getFile();
+
+ int insertPos = -1;
+
+ // find the position where the new file should be inserted
+ for (int i = 0; i < sequenceList.size(); i++) {
+ TsFileResource localFile = sequenceList.get(i);
+ if (localFile.getFile().getName().equals(tsfileToBeInserted.getName())) {
+ return POS_ALREADY_EXIST;
+ }
+ long localPartitionId =
Long.parseLong(localFile.getFile().getParentFile().getName());
+ if (i == sequenceList.size() - 1 && localFile.getEndTimeMap().isEmpty()
+ || newFilePartitionId > localPartitionId) {
+ // skip files that are in the previous partition and the last empty
file, as the all data
+ // in those files must be older than the new file
+ continue;
+ }
+
+ int fileComparison = compareTsFileDevices(newTsFileResource, localFile);
+ switch (fileComparison) {
+ case 0:
+ // some devices are newer but some devices are older, the two files
overlap in general
+ return POS_OVERLAP;
+ case -1:
+ // all devices in localFile are newer than the new file, the new
file can be
+ // inserted before localFile
+ return i - 1;
+ default:
+ // all devices in the local file are older than the new file,
proceed to the next file
+ insertPos = i;
+ }
+ }
+ return insertPos;
+ }
+
+ /**
+ * Compare each device in the two files to find the time relation of them.
+ * @param fileA
+ * @param fileB
+ * @return -1 if fileA is totally older than fileB (A < B)
+ * 0 if fileA is partially older than fileB and partially newer
than fileB (A X B)
+ * 1 if fileA is totally newer than fileB (B < A)
+ */
+ private int compareTsFileDevices(TsFileResource fileA, TsFileResource fileB)
{
+ boolean hasPre = false, hasSubsequence = false;
+ for (String device : fileA.getStartTimeMap().keySet()) {
+ if (!fileB.getStartTimeMap().containsKey(device)) {
+ continue;
+ }
+ long startTimeA = fileA.getStartTimeMap().get(device);
+ long endTimeA = fileA.getEndTimeMap().get(device);
+ long startTimeB = fileB.getStartTimeMap().get(device);
+ long endTimeB = fileB.getEndTimeMap().get(device);
+ if (startTimeA > endTimeB) {
+ // A's data of the device is later than to the B's data
+ hasPre = true;
+ } else if (startTimeB > endTimeA) {
+ // A's data of the device is previous to the B's data
+ hasSubsequence = true;
+ } else {
+ // the two files overlap in the device
+ return 0;
+ }
+ }
+ if (hasPre && hasSubsequence) {
+ // some devices are newer but some devices are older, the two files
overlap in general
+ return 0;
+ }
+ if (!hasPre && hasSubsequence) {
+ // all devices in B are newer than those in A
+ return -1;
+ }
+ // all devices in B are older than those in A
+ return 1;
+ }
+
+ /**
* If the historical versions of a file is a sub-set of the given file's,
remove it to reduce
* unnecessary merge. Only used when the file sender and the receiver share
the same file
* close policy.
+ * Warning: DO NOT REMOVE
* @param resource
*/
+ @SuppressWarnings("unused")
public void removeFullyOverlapFiles(TsFileResource resource) {
writeLock();
closeQueryLock.writeLock().lock();
@@ -1702,24 +1837,25 @@ public class StorageGroupProcessor {
* version number is the version number in the tsfile with a larger
timestamp.
*
* @param tsfileName origin tsfile name
+ * @param insertIndex the new file will be inserted between the files
[insertIndex, insertIndex
+ * + 1]
* @return appropriate filename
*/
- private String getFileNameForLoadingFile(String tsfileName, int preIndex,
int subsequentIndex,
- long timePartitionId) {
+ private String getFileNameForLoadingFile(String tsfileName, int insertIndex,
+ long timePartitionId, List<TsFileResource> sequenceList) {
long currentTsFileTime = Long
.parseLong(tsfileName.split(IoTDBConstant.TSFILE_NAME_SEPARATOR)[0]);
long preTime;
- List<TsFileResource> sequenceList = new ArrayList<>(sequenceFileTreeSet);
- if (preIndex == -1) {
+ if (insertIndex == -1) {
preTime = 0L;
} else {
- String preName = sequenceList.get(preIndex).getFile().getName();
+ String preName = sequenceList.get(insertIndex).getFile().getName();
preTime =
Long.parseLong(preName.split(IoTDBConstant.TSFILE_NAME_SEPARATOR)[0]);
}
- if (subsequentIndex == sequenceFileTreeSet.size()) {
+ if (insertIndex == sequenceFileTreeSet.size() - 1) {
return preTime < currentTsFileTime ? tsfileName :
getNewTsFileName(timePartitionId);
} else {
- String subsequenceName =
sequenceList.get(subsequentIndex).getFile().getName();
+ String subsequenceName = sequenceList.get(insertIndex +
1).getFile().getName();
long subsequenceTime = Long
.parseLong(subsequenceName.split(IoTDBConstant.TSFILE_NAME_SEPARATOR)[0]);
long subsequenceVersion = Long
@@ -1767,19 +1903,18 @@ public class StorageGroupProcessor {
*
* @param type load type
* @param tsFileResource tsfile resource to be loaded
+ * @param filePartitionId the partition id of the new file
* @UsedBy sync module, load external tsfile module.
* @return load the file successfully
*/
private boolean loadTsFileByType(LoadTsFileType type, File syncedTsFile,
- TsFileResource tsFileResource)
- throws TsFileProcessorException, DiskSpaceInsufficientException {
+ TsFileResource tsFileResource, long filePartitionId)
+ throws LoadFileException, DiskSpaceInsufficientException {
File targetFile;
- long timeRangeId = StorageEngine.fromTimeToTimePartition(
-
tsFileResource.getStartTimeMap().entrySet().iterator().next().getValue());
switch (type) {
case LOAD_UNSEQUENCE:
targetFile = new
File(DirectoryManager.getInstance().getNextFolderForUnSequenceFile(),
- storageGroupName + File.separatorChar + timeRangeId +
File.separator + tsFileResource
+ storageGroupName + File.separatorChar + filePartitionId +
File.separator + tsFileResource
.getFile().getName());
tsFileResource.setFile(targetFile);
if(unSequenceFileList.contains(tsFileResource)){
@@ -1793,7 +1928,7 @@ public class StorageGroupProcessor {
case LOAD_SEQUENCE:
targetFile =
new
File(DirectoryManager.getInstance().getNextFolderForSequenceFile(),
- storageGroupName + File.separatorChar + timeRangeId +
File.separator
+ storageGroupName + File.separatorChar + filePartitionId +
File.separator
+ tsFileResource.getFile().getName());
tsFileResource.setFile(targetFile);
if(sequenceFileTreeSet.contains(tsFileResource)){
@@ -1805,7 +1940,7 @@ public class StorageGroupProcessor {
syncedTsFile.getAbsolutePath(), targetFile.getAbsolutePath());
break;
default:
- throw new TsFileProcessorException(
+ throw new LoadFileException(
String.format("Unsupported type of loading tsfile : %s", type));
}
@@ -1818,7 +1953,7 @@ public class StorageGroupProcessor {
} catch (IOException e) {
logger.error("File renaming failed when loading tsfile. Origin: {},
Target: {}",
syncedTsFile.getAbsolutePath(), targetFile.getAbsolutePath(), e);
- throw new TsFileProcessorException(String.format(
+ throw new LoadFileException(String.format(
"File renaming failed when loading tsfile. Origin: %s, Target: %s,
because %s",
syncedTsFile.getAbsolutePath(), targetFile.getAbsolutePath(),
e.getMessage()));
}
@@ -1832,11 +1967,13 @@ public class StorageGroupProcessor {
} catch (IOException e) {
logger.error("File renaming failed when loading .resource file. Origin:
{}, Target: {}",
syncedResourceFile.getAbsolutePath(),
targetResourceFile.getAbsolutePath(), e);
- throw new TsFileProcessorException(String.format(
+ throw new LoadFileException(String.format(
"File renaming failed when loading .resource file. Origin: %s,
Target: %s, because %s",
syncedResourceFile.getAbsolutePath(),
targetResourceFile.getAbsolutePath(),
e.getMessage()));
}
+ partitionDirectFileVersions.computeIfAbsent(filePartitionId,
+ p -> new HashSet<>()).addAll(tsFileResource.getHistoricalVersions());
return true;
}
@@ -1985,8 +2122,9 @@ public class StorageGroupProcessor {
return storageGroupName;
}
- public boolean isFileAlreadyExist(TsFileResource tsFileResource) {
- return
allDirectFileVersions.containsAll(tsFileResource.getHistoricalVersions());
+ public boolean isFileAlreadyExist(TsFileResource tsFileResource, long
partitionNum) {
+ return partitionDirectFileVersions.getOrDefault(partitionNum,
Collections.emptySet())
+ .containsAll(tsFileResource.getHistoricalVersions());
}
@FunctionalInterface
diff --git
a/server/src/main/java/org/apache/iotdb/db/exception/LoadEmptyFileException.java
b/server/src/main/java/org/apache/iotdb/db/exception/LoadEmptyFileException.java
new file mode 100644
index 0000000..9ba22e2
--- /dev/null
+++
b/server/src/main/java/org/apache/iotdb/db/exception/LoadEmptyFileException.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+
+package org.apache.iotdb.db.exception;
+
+public class LoadEmptyFileException extends LoadFileException {
+
+ public LoadEmptyFileException() {
+ super("Cannot load an empty file");
+ }
+
+}
diff --git
a/server/src/main/java/org/apache/iotdb/db/exception/LoadFileException.java
b/server/src/main/java/org/apache/iotdb/db/exception/LoadFileException.java
new file mode 100644
index 0000000..3af898d
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/exception/LoadFileException.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.exception;
+
+import org.apache.iotdb.rpc.TSStatusCode;
+
+public class LoadFileException extends IoTDBException {
+
+ public LoadFileException(String message) {
+ super(message, TSStatusCode.LOAD_FILE_ERROR.getStatusCode());
+ }
+
+ public LoadFileException(Exception exception) {
+ super(exception, TSStatusCode.LOAD_FILE_ERROR.getStatusCode());
+ }
+}
diff --git
a/server/src/main/java/org/apache/iotdb/db/exception/PartitionViolationException.java
b/server/src/main/java/org/apache/iotdb/db/exception/PartitionViolationException.java
new file mode 100644
index 0000000..c794b61
--- /dev/null
+++
b/server/src/main/java/org/apache/iotdb/db/exception/PartitionViolationException.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.exception;
+
+import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+
+public class PartitionViolationException extends LoadFileException{
+
+ public PartitionViolationException(TsFileResource resource) {
+ super(String.format("The data of file %s cross partitions", resource));
+ }
+}
diff --git
a/server/src/main/java/org/apache/iotdb/db/sync/receiver/load/FileLoader.java
b/server/src/main/java/org/apache/iotdb/db/sync/receiver/load/FileLoader.java
index b7cb388..c6a07be 100644
---
a/server/src/main/java/org/apache/iotdb/db/sync/receiver/load/FileLoader.java
+++
b/server/src/main/java/org/apache/iotdb/db/sync/receiver/load/FileLoader.java
@@ -26,9 +26,9 @@ import java.util.concurrent.TimeUnit;
import org.apache.commons.io.FileUtils;
import org.apache.iotdb.db.engine.StorageEngine;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.db.exception.LoadFileException;
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.exception.SyncDeviceOwnerConflictException;
-import org.apache.iotdb.db.exception.TsFileProcessorException;
import org.apache.iotdb.db.sync.conf.SyncConstant;
import org.apache.iotdb.db.utils.FileLoaderUtils;
import org.slf4j.Logger;
@@ -139,7 +139,7 @@ public class FileLoader implements IFileLoader {
StorageEngine.getInstance().loadNewTsFileForSync(tsFileResource);
} catch (SyncDeviceOwnerConflictException e) {
LOGGER.error("Device owner has conflicts, so skip the loading file", e);
- } catch (TsFileProcessorException | StorageEngineException e) {
+ } catch (LoadFileException | StorageEngineException e) {
LOGGER.error("Can not load new tsfile {}", newTsFile.getAbsolutePath(),
e);
throw new IOException(e);
}
diff --git
a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBLoadExternalTsfileTest.java
b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBLoadExternalTsfileTest.java
index 1f377c5..4b01523 100644
---
a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBLoadExternalTsfileTest.java
+++
b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBLoadExternalTsfileTest.java
@@ -186,7 +186,7 @@ public class IoTDBLoadExternalTsfileTest {
.getSequenceFileTreeSet());
File tmpDir = new File(
resources.get(0).getFile().getParentFile().getParentFile().getParentFile(),
- "tmp" + File.separator + "root.vehicle");
+ "tmp" + File.separator + "root.vehicle" + File.separator + "0");
if (!tmpDir.exists()) {
tmpDir.mkdirs();
}
@@ -199,7 +199,7 @@ public class IoTDBLoadExternalTsfileTest {
StorageEngine.getInstance().getProcessor("root.test")
.getSequenceFileTreeSet());
tmpDir = new
File(resources.get(0).getFile().getParentFile().getParentFile().getParentFile(),
- "tmp" + File.separator + "root.test");
+ "tmp" + File.separator + "root.test" + File.separator + "0");
if (!tmpDir.exists()) {
tmpDir.mkdirs();
}
@@ -220,8 +220,8 @@ public class IoTDBLoadExternalTsfileTest {
.getSequenceFileTreeSet());
assertEquals(2, resources.size());
assertNotNull(tmpDir.listFiles());
- assertEquals(0, new File(tmpDir, "root.vehicle").listFiles().length);
- assertEquals(0, new File(tmpDir, "root.test").listFiles().length);
+ assertEquals(0, new File(tmpDir, "root.vehicle" + File.separator +
"0").listFiles().length);
+ assertEquals(0, new File(tmpDir, "root.test" + File.separator +
"0").listFiles().length);
} catch (StorageEngineException e) {
Assert.fail();
}
@@ -289,7 +289,7 @@ public class IoTDBLoadExternalTsfileTest {
assertEquals(2, resources.size());
File tmpDir = new File(
resources.get(0).getFile().getParentFile().getParentFile().getParentFile(),
- "tmp" + File.separator + "root.vehicle");
+ "tmp" + File.separator + "root.vehicle" + File.separator + "0");
if (!tmpDir.exists()) {
tmpDir.mkdirs();
}
@@ -307,7 +307,7 @@ public class IoTDBLoadExternalTsfileTest {
resources = new ArrayList<>(
StorageEngine.getInstance().getProcessor("root.test").getSequenceFileTreeSet());
assertEquals(2, resources.size());
- tmpDir = new File(tmpDir.getParent(), "root.test");
+ tmpDir = new File(tmpDir.getParentFile().getParentFile(), "root.test" +
File.separator + "0");
if (!tmpDir.exists()) {
tmpDir.mkdirs();
}
@@ -322,7 +322,7 @@ public class IoTDBLoadExternalTsfileTest {
}
// load all tsfile in tmp dir
- tmpDir = tmpDir.getParentFile();
+ tmpDir = tmpDir.getParentFile().getParentFile();
statement.execute(String.format("load %s", tmpDir.getAbsolutePath()));
assertEquals(2, StorageEngine.getInstance().getProcessor("root.vehicle")
.getSequenceFileTreeSet().size());
@@ -333,8 +333,8 @@ public class IoTDBLoadExternalTsfileTest {
assertEquals(3, StorageEngine.getInstance().getProcessor("root.test")
.getSequenceFileTreeSet().size());
assertNotNull(tmpDir.listFiles());
- assertEquals(0, new File(tmpDir, "root.vehicle").listFiles().length);
- assertEquals(0, new File(tmpDir, "root.test").listFiles().length);
+ assertEquals(0, new File(tmpDir, "root.vehicle" + File.separator +
"0").listFiles().length);
+ assertEquals(0, new File(tmpDir, "root.test" + File.separator +
"0").listFiles().length);
// check query result
hasResultSet = statement.execute("SELECT * FROM root");
@@ -370,7 +370,7 @@ public class IoTDBLoadExternalTsfileTest {
File tmpDir = new File(
resources.get(0).getFile().getParentFile().getParentFile().getParentFile(),
- "tmp" + File.separator + "root.vehicle");
+ "tmp" + File.separator + "root.vehicle" + File.separator + "0");
if (!tmpDir.exists()) {
tmpDir.mkdirs();
}
@@ -383,7 +383,7 @@ public class IoTDBLoadExternalTsfileTest {
StorageEngine.getInstance().getProcessor("root.test")
.getSequenceFileTreeSet());
tmpDir = new
File(resources.get(0).getFile().getParentFile().getParentFile().getParentFile(),
- "tmp" + File.separator + "root.test");
+ "tmp" + File.separator + "root.test" + File.separator + "0");
if (!tmpDir.exists()) {
tmpDir.mkdirs();
}
@@ -432,7 +432,7 @@ public class IoTDBLoadExternalTsfileTest {
Assert.assertTrue(hasError);
// test load metadata automatically, it will succeed.
- tmpDir = tmpDir.getParentFile();
+ tmpDir = tmpDir.getParentFile().getParentFile();
statement.execute(String.format("load %s true 2",
tmpDir.getAbsolutePath()));
resources = new ArrayList<>(
StorageEngine.getInstance().getProcessor("root.vehicle")
@@ -444,9 +444,10 @@ public class IoTDBLoadExternalTsfileTest {
assertEquals(2, resources.size());
assertEquals(2, tmpDir.listFiles().length);
for (File dir : tmpDir.listFiles()) {
- assertEquals(0, dir.listFiles().length);
+ assertEquals(0, dir.listFiles()[0].listFiles().length);
}
} catch (StorageEngineException e) {
+ e.printStackTrace();
Assert.fail();
}
}
diff --git a/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
b/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
index 74710cd..b22c431 100644
--- a/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
+++ b/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
@@ -39,6 +39,7 @@ public enum TSStatusCode {
STORAGE_ENGINE_ERROR(313),
TSFILE_PROCESSOR_ERROR(314),
PATH_ILLEGAL(315),
+ LOAD_FILE_ERROR(316),
EXECUTE_STATEMENT_ERROR(400),
SQL_PARSE_ERROR(401),