This is an automated email from the ASF dual-hosted git repository. jackietien pushed a commit to branch improve/recover in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
commit f2d9c8b25b03aa231962e82e019243591982d8c3 Author: JackieTien97 <[email protected]> AuthorDate: Thu Jul 23 11:17:40 2020 +0800 combine the duplicate method --- .../engine/storagegroup/StorageGroupProcessor.java | 226 ++++++++++----------- .../writelog/recover/TsFileRecoverPerformer.java | 2 +- 2 files changed, 106 insertions(+), 122 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java index f1818ec..a6cf002 100755 --- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java @@ -72,7 +72,6 @@ import org.apache.iotdb.db.exception.metadata.MetadataException; import org.apache.iotdb.db.exception.query.OutOfTTLException; import org.apache.iotdb.db.exception.query.QueryProcessException; import org.apache.iotdb.db.metadata.MManager; -import org.apache.iotdb.db.metadata.mnode.InternalMNode; import org.apache.iotdb.db.metadata.mnode.LeafMNode; import org.apache.iotdb.db.metadata.mnode.MNode; import org.apache.iotdb.db.qp.physical.crud.DeletePlan; @@ -228,22 +227,20 @@ public class StorageGroupProcessor { private TsFileFlushPolicy fileFlushPolicy; /** - * partitionDirectFileVersions records the versions of the direct TsFiles (generated by close, - * not including the files generated by merge) of each partition. - * As data file close is managed by the leader in the distributed version, the files with the - * same version(s) have the same data, despite that the inner structure (the size and - * organization of chunks) may be different, so we can easily find what remote files we do not - * have locally. - * partition number -> version number set + * partitionDirectFileVersions records the versions of the direct TsFiles (generated by close, not + * including the files generated by merge) of each partition. As data file close is managed by the + * leader in the distributed version, the files with the same version(s) have the same data, + * despite that the inner structure (the size and organization of chunks) may be different, so we + * can easily find what remote files we do not have locally. partition number -> version number + * set */ private Map<Long, Set<Long>> partitionDirectFileVersions = new HashMap<>(); /** - * The max file versions in each partition. By recording this, if several IoTDB instances have - * the same policy of closing file and their ingestion is identical, then files of the same - * version in different IoTDB instance will have identical data, providing convenience for data - * comparison across different instances. - * partition number -> max version number + * The max file versions in each partition. By recording this, if several IoTDB instances have the + * same policy of closing file and their ingestion is identical, then files of the same version in + * different IoTDB instance will have identical data, providing convenience for data comparison + * across different instances. partition number -> max version number */ private Map<Long, Long> partitionMaxFileVersions = new HashMap<>(); @@ -271,27 +268,29 @@ public class StorageGroupProcessor { try { // collect candidate TsFiles from sequential and unsequential data directory Pair<List<TsFileResource>, List<TsFileResource>> seqTsFilesPair = getAllFiles( - DirectoryManager.getInstance().getAllSequenceFileFolders()); + DirectoryManager.getInstance().getAllSequenceFileFolders()); List<TsFileResource> tmpSeqTsFiles = seqTsFilesPair.left; List<TsFileResource> oldSeqTsFiles = seqTsFilesPair.right; upgradeSeqFileList.addAll(oldSeqTsFiles); Pair<List<TsFileResource>, List<TsFileResource>> unseqTsFilesPair = getAllFiles( - DirectoryManager.getInstance().getAllUnSequenceFileFolders()); + DirectoryManager.getInstance().getAllUnSequenceFileFolders()); List<TsFileResource> tmpUnseqTsFiles = unseqTsFilesPair.left; List<TsFileResource> oldUnseqTsFiles = unseqTsFilesPair.right; upgradeUnseqFileList.addAll(oldUnseqTsFiles); - recoverSeqFiles(tmpSeqTsFiles); - recoverUnseqFiles(tmpUnseqTsFiles); + recoverTsFiles(tmpSeqTsFiles, true, workSequenceTsFileProcessors, sequenceFileTreeSet); + recoverTsFiles(tmpUnseqTsFiles, false, workUnsequenceTsFileProcessors, unSequenceFileList); for (TsFileResource resource : sequenceFileTreeSet) { long partitionNum = resource.getTimePartition(); - partitionDirectFileVersions.computeIfAbsent(partitionNum, p -> new HashSet<>()).addAll(resource.getHistoricalVersions()); + partitionDirectFileVersions.computeIfAbsent(partitionNum, p -> new HashSet<>()) + .addAll(resource.getHistoricalVersions()); updatePartitionFileVersion(partitionNum, Collections.max(resource.getHistoricalVersions())); } for (TsFileResource resource : unSequenceFileList) { long partitionNum = resource.getTimePartition(); - partitionDirectFileVersions.computeIfAbsent(partitionNum, p -> new HashSet<>()).addAll(resource.getHistoricalVersions()); + partitionDirectFileVersions.computeIfAbsent(partitionNum, p -> new HashSet<>()) + .addAll(resource.getHistoricalVersions()); updatePartitionFileVersion(partitionNum, Collections.max(resource.getHistoricalVersions())); } @@ -316,7 +315,6 @@ public class StorageGroupProcessor { throw new StorageGroupProcessorException(e); } - for (TsFileResource resource : sequenceFileTreeSet) { long timePartitionId = resource.getTimePartition(); Map<String, Long> endTimeMap = new HashMap<>(); @@ -345,11 +343,11 @@ public class StorageGroupProcessor { /** * use old seq file to update latestTimeForEachDevice, globalLatestFlushedTimeForEachDevice, * partitionLatestFlushedTimeForEachDevice and timePartitionIdVersionControllerMap - * */ private void updateLastestFlushedTime() throws IOException { - VersionController versionController = new SimpleFileVersionController(storageGroupSysDir.getPath()); + VersionController versionController = new SimpleFileVersionController( + storageGroupSysDir.getPath()); long currentVersion = versionController.currVersion(); for (TsFileResource resource : upgradeSeqFileList) { for (Entry<String, Integer> entry : resource.getDeviceToIndexMap().entrySet()) { @@ -358,24 +356,27 @@ public class StorageGroupProcessor { long endTime = resource.getEndTime(index); long endTimePartitionId = StorageEngine.getTimePartition(endTime); latestTimeForEachDevice.computeIfAbsent(endTimePartitionId, l -> new HashMap<>()) - .put(deviceId, endTime); + .put(deviceId, endTime); globalLatestFlushedTimeForEachDevice.put(deviceId, endTime); // set all the covered partition's LatestFlushedTime to Long.MAX_VALUE long partitionId = StorageEngine.getTimePartition(resource.getStartTime(index)); while (partitionId <= endTimePartitionId) { partitionLatestFlushedTimeForEachDevice.computeIfAbsent(partitionId, l -> new HashMap<>()) - .put(deviceId, Long.MAX_VALUE); + .put(deviceId, Long.MAX_VALUE); if (!timePartitionIdVersionControllerMap.containsKey(partitionId)) { - File directory = SystemFileFactory.INSTANCE.getFile(storageGroupSysDir, String.valueOf(partitionId)); - if(!directory.exists()){ + File directory = SystemFileFactory.INSTANCE + .getFile(storageGroupSysDir, String.valueOf(partitionId)); + if (!directory.exists()) { directory.mkdirs(); } - File versionFile = SystemFileFactory.INSTANCE.getFile(directory, SimpleFileVersionController.FILE_PREFIX + currentVersion); + File versionFile = SystemFileFactory.INSTANCE + .getFile(directory, SimpleFileVersionController.FILE_PREFIX + currentVersion); if (!versionFile.createNewFile()) { logger.warn("Version file {} has already been created ", versionFile); } - timePartitionIdVersionControllerMap.put(partitionId, new SimpleFileVersionController(storageGroupSysDir.getPath(), partitionId)); + timePartitionIdVersionControllerMap.put(partitionId, + new SimpleFileVersionController(storageGroupSysDir.getPath(), partitionId)); } partitionId++; } @@ -401,7 +402,8 @@ public class StorageGroupProcessor { }); } - private Pair<List<TsFileResource>, List<TsFileResource>> getAllFiles(List<String> folders) throws IOException { + private Pair<List<TsFileResource>, List<TsFileResource>> getAllFiles(List<String> folders) + throws IOException { List<File> tsFiles = new ArrayList<>(); List<File> upgradeFiles = new ArrayList<>(); for (String baseDir : folders) { @@ -419,9 +421,12 @@ public class StorageGroupProcessor { // the process was interrupted before the merged files could be named continueFailedRenames(fileFolder, MERGE_SUFFIX); - File[] oldTsfileArray = fsFactory.listFilesBySuffix(fileFolder.getAbsolutePath(), TSFILE_SUFFIX); - File[] oldResourceFileArray = fsFactory.listFilesBySuffix(fileFolder.getAbsolutePath(), TsFileResource.RESOURCE_SUFFIX); - File[] oldModificationFileArray = fsFactory.listFilesBySuffix(fileFolder.getAbsolutePath(), ModificationFile.FILE_SUFFIX); + File[] oldTsfileArray = fsFactory + .listFilesBySuffix(fileFolder.getAbsolutePath(), TSFILE_SUFFIX); + File[] oldResourceFileArray = fsFactory + .listFilesBySuffix(fileFolder.getAbsolutePath(), TsFileResource.RESOURCE_SUFFIX); + File[] oldModificationFileArray = fsFactory + .listFilesBySuffix(fileFolder.getAbsolutePath(), ModificationFile.FILE_SUFFIX); File upgradeFolder = fsFactory.getFile(fileFolder, IoTDBConstant.UPGRADE_FOLDER_NAME); // move the old files to upgrade folder if exists if (oldTsfileArray.length != 0 || oldResourceFileArray.length != 0) { @@ -511,13 +516,14 @@ public class StorageGroupProcessor { } } - private void recoverSeqFiles(List<TsFileResource> tsFiles) { + private void recoverTsFiles(List<TsFileResource> tsFiles, boolean isSeq, + TreeMap<Long, TsFileProcessor> treeMap, Collection<TsFileResource> tsFileResources) { for (int i = 0; i < tsFiles.size(); i++) { TsFileResource tsFileResource = tsFiles.get(i); long timePartitionId = tsFileResource.getTimePartition(); TsFileRecoverPerformer recoverPerformer = new TsFileRecoverPerformer(storageGroupName + "-", - getVersionControllerByTimePartitionId(timePartitionId), tsFileResource, false, + getVersionControllerByTimePartitionId(timePartitionId), tsFileResource, !isSeq, i == tsFiles.size() - 1); RestorableTsFileIOWriter writer; @@ -535,49 +541,14 @@ public class StorageGroupProcessor { TsFileProcessor tsFileProcessor = new TsFileProcessor(storageGroupName, tsFileResource, getVersionControllerByTimePartitionId(timePartitionId), this::closeUnsealedTsFileProcessorCallBack, - this::updateLatestFlushTimeCallback, true, writer); - workSequenceTsFileProcessors.put(timePartitionId, tsFileProcessor); - tsFileResource.setProcessor(tsFileProcessor); - tsFileResource.removeResourceFile(); - tsFileProcessor.setTimeRangeId(timePartitionId); - writer.makeMetadataVisible(); - } - sequenceFileTreeSet.add(tsFileResource); - } - } - - private void recoverUnseqFiles(List<TsFileResource> tsFiles) { - for (int i = 0; i < tsFiles.size(); i++) { - TsFileResource tsFileResource = tsFiles.get(i); - long timePartitionId = tsFileResource.getTimePartition(); - - TsFileRecoverPerformer recoverPerformer = new TsFileRecoverPerformer(storageGroupName + "-", - getVersionControllerByTimePartitionId(timePartitionId), tsFileResource, true, - i == tsFiles.size() - 1); - RestorableTsFileIOWriter writer; - try { - writer = recoverPerformer.recover(); - } catch (StorageGroupProcessorException e) { - logger.warn("Skip TsFile: {} because of error in recover: ", tsFileResource.getPath(), e); - continue; - } - if (i != tsFiles.size() - 1 || !writer.canWrite()) { - // not the last file or cannot write, just close it - tsFileResource.setClosed(true); - } else if (writer.canWrite()) { - // the last file is not closed, continue writing to it - TsFileProcessor tsFileProcessor = new TsFileProcessor(storageGroupName, tsFileResource, - getVersionControllerByTimePartitionId(timePartitionId), - this::closeUnsealedTsFileProcessorCallBack, - this::unsequenceFlushCallback, false, writer); - workUnsequenceTsFileProcessors - .put(timePartitionId, tsFileProcessor); + this::updateLatestFlushTimeCallback, isSeq, writer); + treeMap.put(timePartitionId, tsFileProcessor); tsFileResource.setProcessor(tsFileProcessor); tsFileResource.removeResourceFile(); tsFileProcessor.setTimeRangeId(timePartitionId); writer.makeMetadataVisible(); } - unSequenceFileList.add(tsFileResource); + tsFileResources.add(tsFileResource); } } @@ -609,7 +580,8 @@ public class StorageGroupProcessor { long timePartitionId = StorageEngine.getTimePartition(insertPlan.getTime()); latestTimeForEachDevice.computeIfAbsent(timePartitionId, l -> new HashMap<>()); - partitionLatestFlushedTimeForEachDevice.computeIfAbsent(timePartitionId, id -> new HashMap<>()); + partitionLatestFlushedTimeForEachDevice + .computeIfAbsent(timePartitionId, id -> new HashMap<>()); // insert to sequence or unSequence file insertToTsFileProcessor(insertPlan, @@ -648,7 +620,8 @@ public class StorageGroupProcessor { // before is first start point int before = loc; // before time partition - long beforeTimePartition = StorageEngine.getTimePartition(insertTabletPlan.getTimes()[before]); + long beforeTimePartition = StorageEngine + .getTimePartition(insertTabletPlan.getTimes()[before]); // init map long lastFlushTime = partitionLatestFlushedTimeForEachDevice. computeIfAbsent(beforeTimePartition, id -> new HashMap<>()). @@ -709,15 +682,15 @@ public class StorageGroupProcessor { } /** - * insert batch to tsfile processor thread-safety that the caller need to guarantee - * The rows to be inserted are in the range [start, end) + * insert batch to tsfile processor thread-safety that the caller need to guarantee The rows to be + * inserted are in the range [start, end) * * @param insertTabletPlan insert a tablet of a device - * @param sequence whether is sequence - * @param start start index of rows to be inserted in insertTabletPlan - * @param end end index of rows to be inserted in insertTabletPlan - * @param results result array - * @param timePartitionId time partition id + * @param sequence whether is sequence + * @param start start index of rows to be inserted in insertTabletPlan + * @param end end index of rows to be inserted in insertTabletPlan + * @param results result array + * @param timePartitionId time partition id */ private void insertTabletToTsFileProcessor(InsertTabletPlan insertTabletPlan, int start, int end, boolean sequence, TSStatus[] results, long timePartitionId) @@ -845,10 +818,10 @@ public class StorageGroupProcessor { /** * get processor from hashmap, flush oldest processor if necessary * - * @param timeRangeId time partition range + * @param timeRangeId time partition range * @param tsFileProcessorTreeMap tsFileProcessorTreeMap - * @param fileList file list to add new processor - * @param sequence whether is sequence or not + * @param fileList file list to add new processor + * @param sequence whether is sequence or not */ private TsFileProcessor getOrCreateTsFileProcessorIntern(long timeRangeId, TreeMap<Long, TsFileProcessor> tsFileProcessorTreeMap, @@ -1228,7 +1201,7 @@ public class StorageGroupProcessor { .query(deviceId, measurementId, schema.getType(), schema.getEncodingType(), schema.getProps(), context); - tsfileResourcesForQuery.add(new TsFileResource(tsFileResource.getFile(), + tsfileResourcesForQuery.add(new TsFileResource(tsFileResource.getFile(), tsFileResource.getDeviceToIndexMap(), tsFileResource.getStartTimes(), tsFileResource.getEndTimes(), pair.left, pair.right)); @@ -1265,7 +1238,8 @@ public class StorageGroupProcessor { int deviceIndex = tsFileResource.getDeviceToIndexMap().get(deviceId); long startTime = tsFileResource.getStartTime(deviceIndex); - long endTime = tsFileResource.isClosed() || !isSeq ? tsFileResource.getEndTime(deviceIndex) : Long.MAX_VALUE; + long endTime = tsFileResource.isClosed() || !isSeq ? tsFileResource.getEndTime(deviceIndex) + : Long.MAX_VALUE; if (!isAlive(endTime)) { return false; @@ -1282,9 +1256,9 @@ public class StorageGroupProcessor { * Delete data whose timestamp <= 'timestamp' and belongs to the time series * deviceId.measurementId. * - * @param deviceId the deviceId of the timeseries to be deleted. + * @param deviceId the deviceId of the timeseries to be deleted. * @param measurementId the measurementId of the timeseries to be deleted. - * @param timestamp the delete range is (0, timestamp]. + * @param timestamp the delete range is (0, timestamp]. */ public void delete(String deviceId, String measurementId, long timestamp) throws IOException { // TODO: how to avoid partial deletion? @@ -1339,7 +1313,8 @@ public class StorageGroupProcessor { } } - private void logDeletion(long timestamp, String deviceId, String measurementId, long timePartitionId) + private void logDeletion(long timestamp, String deviceId, String measurementId, + long timePartitionId) throws IOException { if (IoTDBDescriptor.getInstance().getConfig().isEnableWal()) { DeletePlan deletionPlan = new DeletePlan(timestamp, new Path(deviceId, measurementId)); @@ -1421,7 +1396,8 @@ public class StorageGroupProcessor { partitionLatestFlushedTimeForEachDevice .computeIfAbsent(processor.getTimeRangeId(), id -> new HashMap<>()) .put(entry.getKey(), entry.getValue()); - updateNewlyFlushedPartitionLatestFlushedTimeForEachDevice(processor.getTimeRangeId(), entry.getKey(), entry.getValue()); + updateNewlyFlushedPartitionLatestFlushedTimeForEachDevice(processor.getTimeRangeId(), + entry.getKey(), entry.getValue()); if (globalLatestFlushedTimeForEachDevice .getOrDefault(entry.getKey(), Long.MIN_VALUE) < entry.getValue()) { globalLatestFlushedTimeForEachDevice.put(entry.getKey(), entry.getValue()); @@ -1434,10 +1410,11 @@ public class StorageGroupProcessor { /** * used for upgrading */ - public void updateNewlyFlushedPartitionLatestFlushedTimeForEachDevice(long partitionId, String deviceId, long time) { + public void updateNewlyFlushedPartitionLatestFlushedTimeForEachDevice(long partitionId, + String deviceId, long time) { newlyFlushedPartitionLatestFlushedTimeForEachDevice - .computeIfAbsent(partitionId, id -> new HashMap<>()) - .compute(deviceId, (k, v) -> v == null ? time : Math.max(v, time)); + .computeIfAbsent(partitionId, id -> new HashMap<>()) + .compute(deviceId, (k, v) -> v == null ? time : Math.max(v, time)); } /** @@ -1490,9 +1467,9 @@ public class StorageGroupProcessor { List<TsFileResource> upgradedResources = tsFileResource.getUpgradedResources(); for (TsFileResource resource : upgradedResources) { long partitionId = resource.getTimePartition(); - resource.getDeviceToIndexMap().forEach((device, index) -> - updateNewlyFlushedPartitionLatestFlushedTimeForEachDevice(partitionId, device, - resource.getEndTime(index)) + resource.getDeviceToIndexMap().forEach((device, index) -> + updateNewlyFlushedPartitionLatestFlushedTimeForEachDevice(partitionId, device, + resource.getEndTime(index)) ); } insertLock.writeLock().lock(); @@ -1506,7 +1483,7 @@ public class StorageGroupProcessor { } mergeLock.writeLock().unlock(); insertLock.writeLock().unlock(); - + // after upgrade complete, update partitionLatestFlushedTimeForEachDevice if (countUpgradeFiles() == 0) { for (Entry<Long, Map<String, Long>> entry : newlyFlushedPartitionLatestFlushedTimeForEachDevice @@ -1678,7 +1655,7 @@ public class StorageGroupProcessor { if (seqFile.getWriteQueryLock().writeLock().isHeldByCurrentThread()) { seqFile.getWriteQueryLock().writeLock().unlock(); } - if(mergeLock.writeLock().isHeldByCurrentThread()) { + if (mergeLock.writeLock().isHeldByCurrentThread()) { mergeLock.writeLock().unlock(); } } @@ -1717,7 +1694,7 @@ public class StorageGroupProcessor { mergeLock.writeLock().lock(); try { if (loadTsFileByType(LoadTsFileType.LOAD_SEQUENCE, tsfileToBeInserted, newTsFileResource, - newFilePartitionId)){ + newFilePartitionId)) { updateLatestTimeMap(newTsFileResource); } } catch (DiskSpaceInsufficientException e) { @@ -1772,7 +1749,8 @@ public class StorageGroupProcessor { if (!newFileName.equals(tsfileToBeInserted.getName())) { logger.info("Tsfile {} must be renamed to {} for loading into the sequence list.", tsfileToBeInserted.getName(), newFileName); - newTsFileResource.setFile(fsFactory.getFile(tsfileToBeInserted.getParentFile(), newFileName)); + newTsFileResource + .setFile(fsFactory.getFile(tsfileToBeInserted.getParentFile(), newFileName)); } } loadTsFileByType(LoadTsFileType.LOAD_SEQUENCE, tsfileToBeInserted, newTsFileResource, @@ -1784,7 +1762,8 @@ public class StorageGroupProcessor { long partitionNum = newTsFileResource.getTimePartition(); partitionDirectFileVersions.computeIfAbsent(partitionNum, p -> new HashSet<>()) .addAll(newTsFileResource.getHistoricalVersions()); - updatePartitionFileVersion(partitionNum, Collections.max(newTsFileResource.getHistoricalVersions())); + updatePartitionFileVersion(partitionNum, + Collections.max(newTsFileResource.getHistoricalVersions())); } catch (DiskSpaceInsufficientException e) { logger.error( "Failed to append the tsfile {} to storage group processor {} because the disk space is insufficient.", @@ -1798,12 +1777,14 @@ public class StorageGroupProcessor { } /** - * Find the position of "newTsFileResource" in the sequence files if it can be inserted into them. + * Find the position of "newTsFileResource" in the sequence files if it can be inserted into + * them. + * * @param newTsFileResource * @param newFilePartitionId - * @return POS_ALREADY_EXIST(-2) if some file has the same name as the one to be inserted - * POS_OVERLAP(-3) if some file overlaps the new file - * an insertion position i >= -1 if the new file can be inserted between [i, i+1] + * @return POS_ALREADY_EXIST(- 2) if some file has the same name as the one to be inserted + * POS_OVERLAP(-3) if some file overlaps the new file an insertion position i >= -1 if the new + * file can be inserted between [i, i+1] */ private int findInsertionPosition(TsFileResource newTsFileResource, long newFilePartitionId, List<TsFileResource> sequenceList) { @@ -1844,11 +1825,11 @@ public class StorageGroupProcessor { /** * Compare each device in the two files to find the time relation of them. + * * @param fileA * @param fileB - * @return -1 if fileA is totally older than fileB (A < B) - * 0 if fileA is partially older than fileB and partially newer than fileB (A X B) - * 1 if fileA is totally newer than fileB (B < A) + * @return -1 if fileA is totally older than fileB (A < B) 0 if fileA is partially older than + * fileB and partially newer than fileB (A X B) 1 if fileA is totally newer than fileB (B < A) */ private int compareTsFileDevices(TsFileResource fileA, TsFileResource fileB) { boolean hasPre = false, hasSubsequence = false; @@ -1885,9 +1866,9 @@ public class StorageGroupProcessor { /** * If the historical versions of a file is a sub-set of the given file's, remove it to reduce - * unnecessary merge. Only used when the file sender and the receiver share the same file - * close policy. - * Warning: DO NOT REMOVE + * unnecessary merge. Only used when the file sender and the receiver share the same file close + * policy. Warning: DO NOT REMOVE + * * @param resource */ @SuppressWarnings("unused") @@ -1948,9 +1929,9 @@ public class StorageGroupProcessor { * returns directly; otherwise, the time stamp is the mean of the timestamps of the two files, the * version number is the version number in the tsfile with a larger timestamp. * - * @param tsfileName origin tsfile name - * @param insertIndex the new file will be inserted between the files [insertIndex, insertIndex - * + 1] + * @param tsfileName origin tsfile name + * @param insertIndex the new file will be inserted between the files [insertIndex, insertIndex + + * 1] * @return appropriate filename */ private String getFileNameForLoadingFile(String tsfileName, int insertIndex, @@ -2014,12 +1995,12 @@ public class StorageGroupProcessor { /** * Execute the loading process by the type. * - * @param type load type - * @param tsFileResource tsfile resource to be loaded + * @param type load type + * @param tsFileResource tsfile resource to be loaded * @param filePartitionId the partition id of the new file - * @UsedBy sync module, load external tsfile module. * @return load the file successfully * @UsedBy sync module, load external tsfile module. + * @UsedBy sync module, load external tsfile module. */ private boolean loadTsFileByType(LoadTsFileType type, File syncedTsFile, TsFileResource tsFileResource, long filePartitionId) @@ -2027,9 +2008,11 @@ public class StorageGroupProcessor { File targetFile; switch (type) { case LOAD_UNSEQUENCE: - targetFile = fsFactory.getFile(DirectoryManager.getInstance().getNextFolderForUnSequenceFile(), - storageGroupName + File.separatorChar + filePartitionId + File.separator + tsFileResource - .getFile().getName()); + targetFile = fsFactory + .getFile(DirectoryManager.getInstance().getNextFolderForUnSequenceFile(), + storageGroupName + File.separatorChar + filePartitionId + File.separator + + tsFileResource + .getFile().getName()); tsFileResource.setFile(targetFile); if (unSequenceFileList.contains(tsFileResource)) { logger.error("The file {} has already been loaded in unsequence list", tsFileResource); @@ -2088,7 +2071,8 @@ public class StorageGroupProcessor { } partitionDirectFileVersions.computeIfAbsent(filePartitionId, p -> new HashSet<>()).addAll(tsFileResource.getHistoricalVersions()); - updatePartitionFileVersion(filePartitionId, Collections.max(tsFileResource.getHistoricalVersions())); + updatePartitionFileVersion(filePartitionId, + Collections.max(tsFileResource.getHistoricalVersions())); return true; } diff --git a/server/src/main/java/org/apache/iotdb/db/writelog/recover/TsFileRecoverPerformer.java b/server/src/main/java/org/apache/iotdb/db/writelog/recover/TsFileRecoverPerformer.java index c4818e3..22d2904 100644 --- a/server/src/main/java/org/apache/iotdb/db/writelog/recover/TsFileRecoverPerformer.java +++ b/server/src/main/java/org/apache/iotdb/db/writelog/recover/TsFileRecoverPerformer.java @@ -76,7 +76,7 @@ public class TsFileRecoverPerformer { * 1. recover the TsFile by RestorableTsFileIOWriter and truncate the file to remaining corrected * data 2. redo the WALs to recover unpersisted data 3. flush and close the file 4. clean WALs * - * @return a RestorableTsFileIOWriter if the file is not closed before crush, so this writer can + * @return a RestorableTsFileIOWriter if the file is not closed before crash, so this writer can * be used to continue writing */ public RestorableTsFileIOWriter recover() throws StorageGroupProcessorException {
