This is an automated email from the ASF dual-hosted git repository. lta pushed a commit to branch reimpl_sync in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
commit 10688c03206b6ee766fb982711710a8f89f341fe Author: lta <[email protected]> AuthorDate: Tue Sep 3 20:00:36 2019 +0800 finish file renaming for loaded sequence tsfiles --- .../org/apache/iotdb/db/conf/IoTDBConstant.java | 1 + .../iotdb/db/engine/merge/task/MergeFileTask.java | 7 +- .../engine/storagegroup/StorageGroupProcessor.java | 113 ++++++++++++++------- .../iotdb/db/sync/receiver/load/FileLoader.java | 45 +++++++- .../db/sync/receiver/load/FileLoaderTest.java | 95 +++++++++-------- .../recover/SyncReceiverLogAnalyzerTest.java | 14 +-- 6 files changed, 183 insertions(+), 92 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConstant.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConstant.java index c29472b..0b6f0cc 100644 --- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConstant.java +++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConstant.java @@ -68,5 +68,6 @@ public class IoTDBConstant { // data folder name public static final String SEQUENCE_FLODER_NAME = "sequence"; public static final String UNSEQUENCE_FLODER_NAME = "unsequence"; + public static final String FILE_NAME_SEPARATOR = "-"; } diff --git a/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeFileTask.java b/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeFileTask.java index 3e5dc84..32050d8 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeFileTask.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeFileTask.java @@ -27,6 +27,7 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; import org.apache.commons.io.FileUtils; +import org.apache.iotdb.db.conf.IoTDBConstant; import org.apache.iotdb.db.engine.cache.DeviceMetaDataCache; import org.apache.iotdb.db.engine.cache.TsFileMetaDataCache; import org.apache.iotdb.db.engine.merge.manage.MergeContext; @@ -232,10 +233,12 @@ class MergeFileTask { } private File getNextMergeVersionFile(File seqFile) { - String[] splits = seqFile.getName().replace(TSFILE_SUFFIX, "").split("-"); + String[] splits = seqFile.getName().replace(TSFILE_SUFFIX, "") + .split(IoTDBConstant.FILE_NAME_SEPARATOR); int mergeVersion = Integer.parseInt(splits[2]) + 1; return new File(seqFile.getParentFile(), - splits[0] + "-" + splits[1] + "-" + mergeVersion + TSFILE_SUFFIX); + splits[0] + IoTDBConstant.FILE_NAME_SEPARATOR + splits[1] + + IoTDBConstant.FILE_NAME_SEPARATOR + mergeVersion + TSFILE_SUFFIX); } private long writeUnmergedChunks(List<Long> chunkStartTimes, diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java index 3555e3d..f61f4e6 100755 --- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java @@ -301,9 +301,11 @@ public class StorageGroupProcessor { // TsFileNameComparator compares TsFiles by the version number in its name // ({systemTime}-{versionNum}-{mergeNum}.tsfile) - public int compareFileName(File o1, File o2) { - String[] items1 = o1.getName().replace(TSFILE_SUFFIX, "").split("-"); - String[] items2 = o2.getName().replace(TSFILE_SUFFIX, "").split("-"); + private int compareFileName(File o1, File o2) { + String[] items1 = o1.getName().replace(TSFILE_SUFFIX, "") + .split(IoTDBConstant.FILE_NAME_SEPARATOR); + String[] items2 = o2.getName().replace(TSFILE_SUFFIX, "") + .split(IoTDBConstant.FILE_NAME_SEPARATOR); if (Long.valueOf(items1[0]) - Long.valueOf(items2[0]) == 0) { return Long.compare(Long.valueOf(items1[1]), Long.valueOf(items2[1])); } else { @@ -495,8 +497,8 @@ public class StorageGroupProcessor { new File(baseDir, storageGroupName).mkdirs(); String filePath = Paths.get(baseDir, storageGroupName, - System.currentTimeMillis() + "-" + versionController.nextVersion()).toString() + "-0" - + TSFILE_SUFFIX; + System.currentTimeMillis() + IoTDBConstant.FILE_NAME_SEPARATOR + versionController + .nextVersion()).toString() + IoTDBConstant.FILE_NAME_SEPARATOR + "0" + TSFILE_SUFFIX; if (sequence) { return new TsFileProcessor(storageGroupName, new File(filePath), @@ -982,6 +984,9 @@ public class StorageGroupProcessor { // check new tsfile outer: for (int i = 0; i < sequenceFileList.size(); i++) { + if (sequenceFileList.get(i).getFile().getName().equals(newTsFile.getName())) { + return; + } if (i == sequenceFileList.size() - 1 && sequenceFileList.get(i).getEndTimeMap().isEmpty()) { continue; } @@ -1032,7 +1037,7 @@ public class StorageGroupProcessor { // update latest time map updateLatestTimeMap(newTsFileResource); - } catch (TsFileProcessorException e) { + } catch (TsFileProcessorException | DiskSpaceInsufficientException e) { logger.error("Failed to append the tsfile {} to storage group processor {}.", newTsFile.getAbsolutePath(), newTsFile.getParentFile().getName()); throw new TsFileProcessorException(e); @@ -1072,54 +1077,87 @@ public class StorageGroupProcessor { * @UsedBy sync module */ private void loadTsFileByType(int type, File tsFile, TsFileResource tsFileResource, int index) - throws TsFileProcessorException { + throws TsFileProcessorException, DiskSpaceInsufficientException { File targetFile; if (type == -1) { targetFile = - new File(tsFile.getParentFile().getParentFile().getParentFile().getParentFile() - .getParentFile(), IoTDBConstant.UNSEQUENCE_FLODER_NAME - + File.separatorChar + tsFile.getParentFile().getName() + File.separatorChar - + tsFile.getName()); + new File(DirectoryManager.getInstance().getNextFolderForUnSequenceFile(), + tsFile.getParentFile().getName() + File.separatorChar + tsFile.getName()); tsFileResource.setFile(targetFile); unSequenceFileList.add(index, tsFileResource); + logger + .info("Load tsfile in unsequence list, move file from {} to {}", tsFile.getAbsolutePath(), + targetFile.getAbsolutePath()); } else { targetFile = - new File(tsFile.getParentFile().getParentFile().getParentFile().getParentFile() - .getParentFile(), IoTDBConstant.SEQUENCE_FLODER_NAME - + File.separatorChar + tsFile.getParentFile().getName() + File.separatorChar - + tsFile.getName()); + new File(DirectoryManager.getInstance().getNextFolderForSequenceFile(), + tsFile.getParentFile().getName() + File.separatorChar + getFileNameForLoadingFile( + tsFile.getName(), index)); tsFileResource.setFile(targetFile); sequenceFileList.add(index, tsFileResource); + logger.info("Load tsfile in sequence list, move file from {} to {}", tsFile.getAbsolutePath(), + targetFile.getAbsolutePath()); } // move file from sync dir to data dir if (!targetFile.getParentFile().exists()) { targetFile.getParentFile().mkdirs(); } - if (!new File(tsFile.getAbsolutePath() + TsFileResource.RESOURCE_SUFFIX).exists() && !new File( - targetFile.getAbsolutePath() + TsFileResource.RESOURCE_SUFFIX).exists()) { - throw new TsFileProcessorException( - String - .format("The new .resource file {%s} to be loaded does not exist.", - tsFile.getAbsolutePath())); + if (tsFile.exists() && !targetFile.exists()) { + try { + FileUtils.moveFile(tsFile, targetFile); + } catch (IOException e) { + throw new TsFileProcessorException(String.format( + "File renaming failed when loading tsfile. Origin: %s, Target: %s", + tsFile.getAbsolutePath(), targetFile.getAbsolutePath())); + } } - if (!new File(targetFile.getAbsolutePath() + TsFileResource.RESOURCE_SUFFIX).exists() - && !new File(tsFile.getAbsolutePath() + TsFileResource.RESOURCE_SUFFIX) - .renameTo(new File(targetFile.getAbsolutePath() + TsFileResource.RESOURCE_SUFFIX))) { - throw new TsFileProcessorException(String.format( - "File renaming failed when loading .resource file. Origin: %s, Target: %s", - new File(tsFile, TsFileResource.RESOURCE_SUFFIX).getAbsolutePath(), - new File(targetFile, TsFileResource.RESOURCE_SUFFIX).getAbsolutePath())); + if (new File(tsFile.getAbsolutePath() + TsFileResource.RESOURCE_SUFFIX).exists() && !new File( + targetFile.getAbsolutePath() + TsFileResource.RESOURCE_SUFFIX).exists()) { + try { + FileUtils.moveFile(new File(tsFile.getAbsolutePath() + TsFileResource.RESOURCE_SUFFIX), + new File(targetFile.getAbsolutePath() + TsFileResource.RESOURCE_SUFFIX)); + } catch (IOException e) { + throw new TsFileProcessorException(String.format( + "File renaming failed when loading .resource file. Origin: %s, Target: %s", + new File(tsFile, TsFileResource.RESOURCE_SUFFIX).getAbsolutePath(), + new File(targetFile, TsFileResource.RESOURCE_SUFFIX).getAbsolutePath())); + } } - if (!tsFile.exists() && !targetFile.exists()) { - throw new TsFileProcessorException(String - .format("The new tsfile {%s} to be loaded does not exist.", - tsFile.getAbsolutePath())); + } + + /** + * Get an appropriate filename to ensure the order between files + * + * @param tsfileName origin tsfile name + * @param index the index to be inserted + * @return appropriate filename + */ + private String getFileNameForLoadingFile(String tsfileName, int index) { + long currentTsFileTime = Long.parseLong(tsfileName.split(IoTDBConstant.FILE_NAME_SEPARATOR)[0]); + long preTime; + if (index == 0) { + preTime = 0L; + } else { + String preName = sequenceFileList.get(index - 1).getFile().getName(); + preTime = Long.parseLong(preName.split(IoTDBConstant.FILE_NAME_SEPARATOR)[0]); } - if (!targetFile.exists() && !tsFile.renameTo(targetFile)) { - throw new TsFileProcessorException(String.format( - "File renaming failed when loading tsfile. Origin: %s, Target: %s", - tsFile.getAbsolutePath(), targetFile.getAbsolutePath())); + if (index == sequenceFileList.size()) { + return preTime < currentTsFileTime ? tsfileName + : System.currentTimeMillis() + IoTDBConstant.FILE_NAME_SEPARATOR + versionController + .nextVersion() + IoTDBConstant.FILE_NAME_SEPARATOR + "0" + TSFILE_SUFFIX; + } else { + String subsequenceName = sequenceFileList.get(index).getFile().getName(); + long subsequenceTime = Long + .parseLong(subsequenceName.split(IoTDBConstant.FILE_NAME_SEPARATOR)[0]); + long subsequenceVersion = Long + .parseLong(subsequenceName.split(IoTDBConstant.FILE_NAME_SEPARATOR)[1]); + if (preTime < currentTsFileTime && currentTsFileTime < subsequenceTime) { + return tsfileName; + } else { + return (preTime + ((subsequenceTime - preTime) >> 1)) + IoTDBConstant.FILE_NAME_SEPARATOR + + subsequenceVersion + IoTDBConstant.FILE_NAME_SEPARATOR + "0" + TSFILE_SUFFIX; + } } } @@ -1168,7 +1206,8 @@ public class StorageGroupProcessor { deletedTsFileResource.getMergeQueryLock().writeLock().lock(); try { deletedTsFileResource.getFile().delete(); - new File(deletedTsFileResource.getFile().getAbsolutePath() + TsFileResource.RESOURCE_SUFFIX).delete(); + new File(deletedTsFileResource.getFile().getAbsolutePath() + TsFileResource.RESOURCE_SUFFIX) + .delete(); } finally { deletedTsFileResource.getMergeQueryLock().writeLock().unlock(); } diff --git a/server/src/main/java/org/apache/iotdb/db/sync/receiver/load/FileLoader.java b/server/src/main/java/org/apache/iotdb/db/sync/receiver/load/FileLoader.java index 2fa47d9..6d724ed 100644 --- a/server/src/main/java/org/apache/iotdb/db/sync/receiver/load/FileLoader.java +++ b/server/src/main/java/org/apache/iotdb/db/sync/receiver/load/FileLoader.java @@ -16,6 +16,8 @@ package org.apache.iotdb.db.sync.receiver.load; import java.io.File; import java.io.IOException; +import java.util.ArrayList; +import java.util.List; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; @@ -25,6 +27,12 @@ import org.apache.iotdb.db.engine.storagegroup.TsFileResource; import org.apache.iotdb.db.exception.StorageEngineException; import org.apache.iotdb.db.exception.TsFileProcessorException; import org.apache.iotdb.db.sync.sender.conf.SyncConstant; +import org.apache.iotdb.tsfile.file.metadata.ChunkGroupMetaData; +import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData; +import org.apache.iotdb.tsfile.file.metadata.TsDeviceMetadata; +import org.apache.iotdb.tsfile.file.metadata.TsDeviceMetadataIndex; +import org.apache.iotdb.tsfile.file.metadata.TsFileMetaData; +import org.apache.iotdb.tsfile.read.TsFileSequenceReader; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -75,7 +83,7 @@ public class FileLoader implements IFileLoader { if (loadTask != null) { try { handleLoadTask(loadTask); - } catch (IOException e) { + } catch (Exception e) { LOGGER.error("Can not load task {}", loadTask, e); } } @@ -121,8 +129,12 @@ public class FileLoader implements IFileLoader { loadLog.startLoadTsFiles(); curType = LoadType.ADD; } + if (!newTsFile.exists()) { + LOGGER.info("Tsfile {} doesn't exist.", newTsFile.getAbsolutePath()); + return; + } TsFileResource tsFileResource = new TsFileResource(new File(newTsFile.getAbsolutePath())); - tsFileResource.deSerialize(); + checkTsFileResource(tsFileResource); try { StorageEngine.getInstance().loadNewTsFile(newTsFile, tsFileResource); } catch (TsFileProcessorException | StorageEngineException e) { @@ -132,6 +144,35 @@ public class FileLoader implements IFileLoader { loadLog.finishLoadDeletedFile(newTsFile); } + private void checkTsFileResource(TsFileResource tsFileResource) throws IOException { + if (!tsFileResource.fileExists()) { + // .resource file does not exist, read file metadata and recover tsfile resource + try (TsFileSequenceReader reader = new TsFileSequenceReader( + tsFileResource.getFile().getAbsolutePath())) { + TsFileMetaData metaData = reader.readFileMetadata(); + List<TsDeviceMetadataIndex> deviceMetadataIndexList = new ArrayList<>( + metaData.getDeviceMap().values()); + for (TsDeviceMetadataIndex index : deviceMetadataIndexList) { + TsDeviceMetadata deviceMetadata = reader.readTsDeviceMetaData(index); + List<ChunkGroupMetaData> chunkGroupMetaDataList = deviceMetadata + .getChunkGroupMetaDataList(); + for (ChunkGroupMetaData chunkGroupMetaData : chunkGroupMetaDataList) { + for (ChunkMetaData chunkMetaData : chunkGroupMetaData.getChunkMetaDataList()) { + tsFileResource.updateStartTime(chunkGroupMetaData.getDeviceID(), + chunkMetaData.getStartTime()); + tsFileResource + .updateEndTime(chunkGroupMetaData.getDeviceID(), chunkMetaData.getEndTime()); + } + } + } + } + // write .resource file + tsFileResource.serialize(); + } else { + tsFileResource.deSerialize(); + } + } + private void loadDeletedFile(File deletedTsFile) throws IOException { if (curType != LoadType.DELETE) { loadLog.startLoadDeletedFiles(); diff --git a/server/src/test/java/org/apache/iotdb/db/sync/receiver/load/FileLoaderTest.java b/server/src/test/java/org/apache/iotdb/db/sync/receiver/load/FileLoaderTest.java index 4f31c57..637483f 100644 --- a/server/src/test/java/org/apache/iotdb/db/sync/receiver/load/FileLoaderTest.java +++ b/server/src/test/java/org/apache/iotdb/db/sync/receiver/load/FileLoaderTest.java @@ -20,8 +20,10 @@ package org.apache.iotdb.db.sync.receiver.load; import java.io.File; import java.io.IOException; +import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; +import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Random; @@ -83,25 +85,28 @@ public class FileLoaderTest { @Test public void loadNewTsfiles() throws IOException, StorageEngineException { fileLoader = FileLoader.createFileLoader(getReceiverFolderFile()); - Map<String, Set<File>> allFileList = new HashMap<>(); - Map<String, Set<File>> correctSequenceLoadedFileMap = new HashMap<>(); + Map<String, List<File>> allFileList = new HashMap<>(); + Map<String, Set<String>> correctSequenceLoadedFileMap = new HashMap<>(); // add some new tsfiles Random r = new Random(0); + long time = System.currentTimeMillis(); for (int i = 0; i < 3; i++) { for (int j = 0; j < 10; j++) { - allFileList.putIfAbsent(SG_NAME + i, new HashSet<>()); + allFileList.putIfAbsent(SG_NAME + i, new ArrayList<>()); correctSequenceLoadedFileMap.putIfAbsent(SG_NAME + i, new HashSet<>()); String rand = String.valueOf(r.nextInt(10000)); String fileName = - getSnapshotFolder() + File.separator + SG_NAME + i + File.separator + rand + ".tsfile"; + getSnapshotFolder() + File.separator + SG_NAME + i + File.separator + (time + i * 100 + + j) + IoTDBConstant.FILE_NAME_SEPARATOR + rand + + IoTDBConstant.FILE_NAME_SEPARATOR + "0.tsfile"; File syncFile = new File(fileName); File dataFile = new File( syncFile.getParentFile().getParentFile().getParentFile().getParentFile() .getParentFile(), IoTDBConstant.SEQUENCE_FLODER_NAME + File.separatorChar + syncFile.getParentFile().getName() + File.separatorChar + syncFile.getName()); - correctSequenceLoadedFileMap.get(SG_NAME + i).add(dataFile); + correctSequenceLoadedFileMap.get(SG_NAME + i).add(dataFile.getAbsolutePath()); allFileList.get(SG_NAME + i).add(syncFile); if (!syncFile.getParentFile().exists()) { syncFile.getParentFile().mkdirs(); @@ -128,7 +133,7 @@ public class FileLoaderTest { } assert getReceiverFolderFile().exists(); - for (Set<File> set : allFileList.values()) { + for (List<File> set : allFileList.values()) { for (File newTsFile : set) { if (!newTsFile.getName().endsWith(TsFileResource.RESOURCE_SUFFIX)) { fileLoader.addTsfile(newTsFile); @@ -150,19 +155,19 @@ public class FileLoaderTest { } assert !new File(getReceiverFolderFile(), SyncConstant.RECEIVER_DATA_FOLDER_NAME).exists(); - Map<String, Set<File>> sequenceLoadedFileMap = new HashMap<>(); + Map<String, Set<String>> sequenceLoadedFileMap = new HashMap<>(); for (int i = 0; i < 3; i++) { StorageGroupProcessor processor = StorageEngine.getInstance().getProcessor(SG_NAME + i); sequenceLoadedFileMap.putIfAbsent(SG_NAME + i, new HashSet<>()); assert processor.getSequenceFileList().size() == 10; for (TsFileResource tsFileResource : processor.getSequenceFileList()) { - sequenceLoadedFileMap.get(SG_NAME + i).add(tsFileResource.getFile()); + sequenceLoadedFileMap.get(SG_NAME + i).add(tsFileResource.getFile().getAbsolutePath()); } assert processor.getUnSequenceFileList().isEmpty(); } assert sequenceLoadedFileMap.size() == correctSequenceLoadedFileMap.size(); - for (Entry<String, Set<File>> entry : correctSequenceLoadedFileMap.entrySet()) { + for (Entry<String, Set<String>> entry : correctSequenceLoadedFileMap.entrySet()) { String sg = entry.getKey(); assert entry.getValue().size() == sequenceLoadedFileMap.get(sg).size(); assert entry.getValue().containsAll(sequenceLoadedFileMap.get(sg)); @@ -171,23 +176,26 @@ public class FileLoaderTest { // add some overlap new tsfiles fileLoader = FileLoader.createFileLoader(getReceiverFolderFile()); - Map<String, Set<File>> correctUnSequenceLoadedFileMap = new HashMap<>(); + Map<String, Set<String>> correctUnSequenceLoadedFileMap = new HashMap<>(); allFileList = new HashMap<>(); r = new Random(1); + time = System.currentTimeMillis(); for (int i = 0; i < 3; i++) { for (int j = 0; j < 10; j++) { - allFileList.putIfAbsent(SG_NAME + i, new HashSet<>()); + allFileList.putIfAbsent(SG_NAME + i, new ArrayList<>()); correctUnSequenceLoadedFileMap.putIfAbsent(SG_NAME + i, new HashSet<>()); String rand = String.valueOf(r.nextInt(10000)); String fileName = - getSnapshotFolder() + File.separator + SG_NAME + i + File.separator + rand + ".tsfile"; + getSnapshotFolder() + File.separator + SG_NAME + i + File.separator + (time + i * 100 + + j) + IoTDBConstant.FILE_NAME_SEPARATOR + rand + + IoTDBConstant.FILE_NAME_SEPARATOR + "0.tsfile"; File syncFile = new File(fileName); File dataFile = new File( syncFile.getParentFile().getParentFile().getParentFile().getParentFile() .getParentFile(), IoTDBConstant.UNSEQUENCE_FLODER_NAME + File.separatorChar + syncFile.getParentFile().getName() + File.separatorChar + syncFile.getName()); - correctUnSequenceLoadedFileMap.get(SG_NAME + i).add(dataFile); + correctUnSequenceLoadedFileMap.get(SG_NAME + i).add(dataFile.getAbsolutePath()); allFileList.get(SG_NAME + i).add(syncFile); if (!syncFile.getParentFile().exists()) { syncFile.getParentFile().mkdirs(); @@ -214,7 +222,7 @@ public class FileLoaderTest { } assert getReceiverFolderFile().exists(); - for (Set<File> set : allFileList.values()) { + for (List<File> set : allFileList.values()) { for (File newTsFile : set) { if (!newTsFile.getName().endsWith(TsFileResource.RESOURCE_SUFFIX)) { fileLoader.addTsfile(newTsFile); @@ -242,30 +250,30 @@ public class FileLoaderTest { sequenceLoadedFileMap.putIfAbsent(SG_NAME + i, new HashSet<>()); assert processor.getSequenceFileList().size() == 10; for (TsFileResource tsFileResource : processor.getSequenceFileList()) { - sequenceLoadedFileMap.get(SG_NAME + i).add(tsFileResource.getFile()); + sequenceLoadedFileMap.get(SG_NAME + i).add(tsFileResource.getFile().getAbsolutePath()); } assert !processor.getUnSequenceFileList().isEmpty(); } assert sequenceLoadedFileMap.size() == correctSequenceLoadedFileMap.size(); - for (Entry<String, Set<File>> entry : correctSequenceLoadedFileMap.entrySet()) { + for (Entry<String, Set<String>> entry : correctSequenceLoadedFileMap.entrySet()) { String sg = entry.getKey(); assert entry.getValue().size() == sequenceLoadedFileMap.get(sg).size(); assert entry.getValue().containsAll(sequenceLoadedFileMap.get(sg)); } - Map<String, Set<File>> unsequenceLoadedFileMap = new HashMap<>(); + Map<String, Set<String>> unsequenceLoadedFileMap = new HashMap<>(); for (int i = 0; i < 3; i++) { StorageGroupProcessor processor = StorageEngine.getInstance().getProcessor(SG_NAME + i); unsequenceLoadedFileMap.putIfAbsent(SG_NAME + i, new HashSet<>()); assert processor.getUnSequenceFileList().size() == 10; for (TsFileResource tsFileResource : processor.getUnSequenceFileList()) { - unsequenceLoadedFileMap.get(SG_NAME + i).add(tsFileResource.getFile()); + unsequenceLoadedFileMap.get(SG_NAME + i).add(tsFileResource.getFile().getAbsolutePath()); } } assert unsequenceLoadedFileMap.size() == correctUnSequenceLoadedFileMap.size(); - for (Entry<String, Set<File>> entry : correctUnSequenceLoadedFileMap.entrySet()) { + for (Entry<String, Set<String>> entry : correctUnSequenceLoadedFileMap.entrySet()) { String sg = entry.getKey(); assert entry.getValue().size() == unsequenceLoadedFileMap.get(sg).size(); assert entry.getValue().containsAll(unsequenceLoadedFileMap.get(sg)); @@ -273,27 +281,28 @@ public class FileLoaderTest { } @Test - public void loadDeletedFileName() throws IOException, StorageEngineException { + public void loadDeletedFileName() throws IOException, StorageEngineException, InterruptedException { fileLoader = FileLoader.createFileLoader(getReceiverFolderFile()); - Map<String, Set<File>> allFileList = new HashMap<>(); - Map<String, Set<File>> correctLoadedFileMap = new HashMap<>(); + Map<String, List<File>> allFileList = new HashMap<>(); + Map<String, Set<String>> correctLoadedFileMap = new HashMap<>(); // add some tsfiles Random r = new Random(0); + long time = System.currentTimeMillis(); for (int i = 0; i < 3; i++) { for (int j = 0; j < 25; j++) { - allFileList.putIfAbsent(SG_NAME + i, new HashSet<>()); + allFileList.putIfAbsent(SG_NAME + i, new ArrayList<>()); correctLoadedFileMap.putIfAbsent(SG_NAME + i, new HashSet<>()); String rand = String.valueOf(r.nextInt(10000)); String fileName = - getSnapshotFolder() + File.separator + SG_NAME + i + File.separator + rand + ".tsfile"; + getSnapshotFolder() + File.separator + SG_NAME + i + File.separator + (time + i * 100 + + j) + IoTDBConstant.FILE_NAME_SEPARATOR + rand + + IoTDBConstant.FILE_NAME_SEPARATOR + "0.tsfile"; File syncFile = new File(fileName); File dataFile = new File( - syncFile.getParentFile().getParentFile().getParentFile().getParentFile() - .getParentFile(), IoTDBConstant.SEQUENCE_FLODER_NAME - + File.separatorChar + syncFile.getParentFile().getName() + File.separatorChar - + syncFile.getName()); - correctLoadedFileMap.get(SG_NAME + i).add(dataFile); + DirectoryManager.getInstance().getNextFolderForSequenceFile(), + syncFile.getParentFile().getName() + File.separatorChar + syncFile.getName()); + correctLoadedFileMap.get(SG_NAME + i).add(dataFile.getAbsolutePath()); allFileList.get(SG_NAME + i).add(syncFile); if (!syncFile.getParentFile().exists()) { syncFile.getParentFile().mkdirs(); @@ -318,7 +327,7 @@ public class FileLoaderTest { } assert getReceiverFolderFile().exists(); - for (Set<File> set : allFileList.values()) { + for (List<File> set : allFileList.values()) { for (File newTsFile : set) { if (!newTsFile.getName().endsWith(TsFileResource.RESOURCE_SUFFIX)) { fileLoader.addTsfile(newTsFile); @@ -340,19 +349,19 @@ public class FileLoaderTest { } assert !new File(getReceiverFolderFile(), SyncConstant.RECEIVER_DATA_FOLDER_NAME).exists(); - Map<String, Set<File>> loadedFileMap = new HashMap<>(); + Map<String, Set<String>> loadedFileMap = new HashMap<>(); for (int i = 0; i < 3; i++) { StorageGroupProcessor processor = StorageEngine.getInstance().getProcessor(SG_NAME + i); loadedFileMap.putIfAbsent(SG_NAME + i, new HashSet<>()); assert processor.getSequenceFileList().size() == 25; for (TsFileResource tsFileResource : processor.getSequenceFileList()) { - loadedFileMap.get(SG_NAME + i).add(tsFileResource.getFile()); + loadedFileMap.get(SG_NAME + i).add(tsFileResource.getFile().getAbsolutePath()); } assert processor.getUnSequenceFileList().isEmpty(); } assert loadedFileMap.size() == correctLoadedFileMap.size(); - for (Entry<String, Set<File>> entry : correctLoadedFileMap.entrySet()) { + for (Entry<String, Set<String>> entry : correctLoadedFileMap.entrySet()) { String sg = entry.getKey(); assert entry.getValue().size() == loadedFileMap.get(sg).size(); assert entry.getValue().containsAll(loadedFileMap.get(sg)); @@ -360,18 +369,16 @@ public class FileLoaderTest { // delete some tsfiles fileLoader = FileLoader.createFileLoader(getReceiverFolderFile()); - for(Entry<String, Set<File>> entry:allFileList.entrySet()){ + for(Entry<String, List<File>> entry:allFileList.entrySet()){ String sg = entry.getKey(); - Set<File> files = entry.getValue(); + List<File> files = entry.getValue(); int cnt = 0; for(File snapFile:files){ - if(!snapFile.getName().endsWith(TsFileResource.RESOURCE_SUFFIX)){ + if (!snapFile.getName().endsWith(TsFileResource.RESOURCE_SUFFIX)) { File dataFile = new File( - snapFile.getParentFile().getParentFile().getParentFile().getParentFile() - .getParentFile(), IoTDBConstant.SEQUENCE_FLODER_NAME - + File.separatorChar + snapFile.getParentFile().getName() + File.separatorChar - + snapFile.getName()); - correctLoadedFileMap.get(sg).remove(dataFile); + DirectoryManager.getInstance().getNextFolderForSequenceFile(), + snapFile.getParentFile().getName() + File.separatorChar + snapFile.getName()); + correctLoadedFileMap.get(sg).remove(dataFile.getAbsolutePath()); snapFile.delete(); fileLoader.addDeletedFileName(snapFile); new File(snapFile + TsFileResource.RESOURCE_SUFFIX).delete(); @@ -395,18 +402,18 @@ public class FileLoaderTest { LOGGER.error("Fail to wait for loading new tsfiles", e); } - loadedFileMap = new HashMap<>(); + loadedFileMap.clear(); for (int i = 0; i < 3; i++) { StorageGroupProcessor processor = StorageEngine.getInstance().getProcessor(SG_NAME + i); loadedFileMap.putIfAbsent(SG_NAME + i, new HashSet<>()); for (TsFileResource tsFileResource : processor.getSequenceFileList()) { - loadedFileMap.get(SG_NAME + i).add(tsFileResource.getFile()); + loadedFileMap.get(SG_NAME + i).add(tsFileResource.getFile().getAbsolutePath()); } assert processor.getUnSequenceFileList().isEmpty(); } assert loadedFileMap.size() == correctLoadedFileMap.size(); - for (Entry<String, Set<File>> entry : correctLoadedFileMap.entrySet()) { + for (Entry<String, Set<String>> entry : correctLoadedFileMap.entrySet()) { String sg = entry.getKey(); assert entry.getValue().size() == loadedFileMap.get(sg).size(); assert entry.getValue().containsAll(loadedFileMap.get(sg)); diff --git a/server/src/test/java/org/apache/iotdb/db/sync/receiver/recover/SyncReceiverLogAnalyzerTest.java b/server/src/test/java/org/apache/iotdb/db/sync/receiver/recover/SyncReceiverLogAnalyzerTest.java index f2de4d0..4743059 100644 --- a/server/src/test/java/org/apache/iotdb/db/sync/receiver/recover/SyncReceiverLogAnalyzerTest.java +++ b/server/src/test/java/org/apache/iotdb/db/sync/receiver/recover/SyncReceiverLogAnalyzerTest.java @@ -105,16 +105,17 @@ public class SyncReceiverLogAnalyzerTest { correctSequenceLoadedFileMap.putIfAbsent(SG_NAME + i, new HashSet<>()); String rand = String.valueOf(r.nextInt(10000)); String fileName = - getSnapshotFolder() + File.separator + SG_NAME + i + File.separator + rand + ".tsfile"; + getSnapshotFolder() + File.separator + SG_NAME + i + File.separator + System + .currentTimeMillis() + IoTDBConstant.FILE_NAME_SEPARATOR + rand + + IoTDBConstant.FILE_NAME_SEPARATOR + "0.tsfile"; File syncFile = new File(fileName); receiverLogger .finishSyncTsfile(syncFile); toBeSyncedFiles.add(syncFile.getAbsolutePath()); File dataFile = new File( - syncFile.getParentFile().getParentFile().getParentFile().getParentFile() - .getParentFile(), IoTDBConstant.SEQUENCE_FLODER_NAME - + File.separatorChar + syncFile.getParentFile().getName() + File.separatorChar - + syncFile.getName()); + DirectoryManager.getInstance().getNextFolderForSequenceFile(), + syncFile.getParentFile().getName() + File.separatorChar + + syncFile.getName()); correctSequenceLoadedFileMap.get(SG_NAME + i).add(dataFile); allFileList.get(SG_NAME + i).add(syncFile); if (!syncFile.getParentFile().exists()) { @@ -154,13 +155,12 @@ public class SyncReceiverLogAnalyzerTest { assert new File(getReceiverFolderFile(), SyncConstant.LOAD_LOG_NAME).exists(); assert new File(getReceiverFolderFile(), SyncConstant.SYNC_LOG_NAME).exists(); assert FileLoaderManager.getInstance().containsFileLoader(getReceiverFolderFile().getName()); - int count = 0, mode = 0; + int mode = 0; Set<String> toBeSyncedFilesTest = new HashSet<>(); try (BufferedReader br = new BufferedReader( new FileReader(new File(getReceiverFolderFile(), SyncConstant.SYNC_LOG_NAME)))) { String line; while ((line = br.readLine()) != null) { - count++; if (line.equals(SyncReceiverLogger.SYNC_DELETED_FILE_NAME_START)) { mode = -1; } else if (line.equals(SyncReceiverLogger.SYNC_TSFILE_START)) {
