This is an automated email from the ASF dual-hosted git repository. chaow pushed a commit to branch fix_sync_bug in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit de8f140afd35a2fe5ef22a1331ed4a819b8b5d9f Author: chaow <[email protected]> AuthorDate: Tue Jan 26 14:45:50 2021 +0800 fix sync bug for tsfiles's directory changed by vitural storage group --- .../org/apache/iotdb/db/engine/StorageEngine.java | 4 +- .../db/sync/sender/manage/ISyncFileManager.java | 11 +- .../db/sync/sender/manage/SyncFileManager.java | 137 +++++++++++++-------- .../iotdb/db/sync/sender/transfer/ISyncClient.java | 3 +- .../iotdb/db/sync/sender/transfer/SyncClient.java | 47 ++++--- .../java/org/apache/iotdb/db/utils/SyncUtils.java | 12 +- .../db/sync/sender/manage/SyncFileManagerTest.java | 116 +++++++++-------- .../sender/recover/SyncSenderLogAnalyzerTest.java | 51 +++++--- 8 files changed, 228 insertions(+), 153 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java index 1170733..e25b5a5 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java @@ -773,7 +773,7 @@ public class StorageEngine implements IService { public void loadNewTsFileForSync(TsFileResource newTsFileResource) throws StorageEngineException, LoadFileException, IllegalPathException { - getProcessorDirectly(new PartialPath(getSgByEngineFile(newTsFileResource.getTsFile()))) + getProcessorDirectly(new PartialPath(newTsFileResource.getTsFile().getParentFile().getName())) .loadNewTsFileForSync(newTsFileResource); } @@ -791,7 +791,7 @@ public class StorageEngine implements IService { public boolean deleteTsfileForSync(File deletedTsfile) throws StorageEngineException, IllegalPathException { - return getProcessorDirectly(new PartialPath(getSgByEngineFile(deletedTsfile))) + return getProcessorDirectly(new PartialPath(deletedTsfile.getParentFile().getName())) .deleteTsfile(deletedTsfile); } diff --git a/server/src/main/java/org/apache/iotdb/db/sync/sender/manage/ISyncFileManager.java b/server/src/main/java/org/apache/iotdb/db/sync/sender/manage/ISyncFileManager.java index 9fd9371..bfd9d77 100644 --- a/server/src/main/java/org/apache/iotdb/db/sync/sender/manage/ISyncFileManager.java +++ b/server/src/main/java/org/apache/iotdb/db/sync/sender/manage/ISyncFileManager.java @@ -55,13 +55,14 @@ public interface ISyncFileManager { */ void getValidFiles(String dataDir) throws IOException; - Map<String, Map<Long, Set<File>>> getCurrentSealedLocalFilesMap(); - Map<String, Map<Long, Set<File>>> getLastLocalFilesMap(); + Map<String, Map<Long, Map<Long, Set<File>>>> getCurrentSealedLocalFilesMap(); - Map<String, Map<Long, Set<File>>> getDeletedFilesMap(); + Map<String, Map<Long, Map<Long, Set<File>>>> getLastLocalFilesMap(); - Map<String, Map<Long, Set<File>>> getToBeSyncedFilesMap(); + Map<String, Map<Long, Map<Long, Set<File>>>> getDeletedFilesMap(); - Map<String, Set<Long>> getAllSGs(); + Map<String, Map<Long, Map<Long, Set<File>>>> getToBeSyncedFilesMap(); + + Map<String, Map<Long, Set<Long>>> getAllSGs(); } diff --git a/server/src/main/java/org/apache/iotdb/db/sync/sender/manage/SyncFileManager.java b/server/src/main/java/org/apache/iotdb/db/sync/sender/manage/SyncFileManager.java index 0a41280..163daea 100644 --- a/server/src/main/java/org/apache/iotdb/db/sync/sender/manage/SyncFileManager.java +++ b/server/src/main/java/org/apache/iotdb/db/sync/sender/manage/SyncFileManager.java @@ -47,33 +47,35 @@ public class SyncFileManager implements ISyncFileManager { /** * All storage groups on the disk where the current sync task is executed + * logicalSg -> <virtualSg, timeRangeId> */ - private Map<String, Set<Long>> allSGs; + private Map<String, Map<Long, Set<Long>>> allSGs; /** * Key is storage group, value is all sealed tsfiles in the storage group. Inner key is time range * id, inner value is the set of current sealed tsfiles. + * logicalSg -> virtualSg -> <timeRangeId, files> */ - private Map<String, Map<Long, Set<File>>> currentSealedLocalFilesMap; + private Map<String, Map<Long, Map<Long, Set<File>>>> currentSealedLocalFilesMap; /** * Key is storage group, value is all last local tsfiles in the storage group, which doesn't * contains those tsfiles which are not synced successfully. Inner key is time range id, inner * value is the set of last local tsfiles. */ - private Map<String, Map<Long, Set<File>>> lastLocalFilesMap; + private Map<String, Map<Long, Map<Long, Set<File>>>> lastLocalFilesMap; /** * Key is storage group, value is all deleted tsfiles which need to be synced to receiver end in * the storage group. Inner key is time range id, inner value is the valid set of sealed tsfiles. */ - private Map<String, Map<Long, Set<File>>> deletedFilesMap; + private Map<String, Map<Long, Map<Long, Set<File>>>> deletedFilesMap; /** * Key is storage group, value is all new tsfiles which need to be synced to receiver end in the * storage group. Inner key is time range id, inner value is the valid set of new tsfiles. */ - private Map<String, Map<Long, Set<File>>> toBeSyncedFilesMap; + private Map<String, Map<Long, Map<Long, Set<File>>>> toBeSyncedFilesMap; private SyncFileManager() { IoTDB.metaManager.init(); @@ -90,7 +92,7 @@ public class SyncFileManager implements ISyncFileManager { currentSealedLocalFilesMap = new HashMap<>(); // get all files in data dir sequence folder - Map<String, Map<Long, Set<File>>> currentAllLocalFiles = new HashMap<>(); + Map<String, Map<Long, Map<Long, Set<File>>>> currentAllLocalFiles = new HashMap<>(); if (!new File(dataDir + File.separatorChar + IoTDBConstant.SEQUENCE_FLODER_NAME).exists()) { return; } @@ -102,35 +104,49 @@ public class SyncFileManager implements ISyncFileManager { .equals(TsFileConstant.TMP_SUFFIX)) { continue; } - allSGs.putIfAbsent(sgFolder.getName(), new HashSet<>()); + allSGs.putIfAbsent(sgFolder.getName(), new HashMap<>()); currentAllLocalFiles.putIfAbsent(sgFolder.getName(), new HashMap<>()); - for (File timeRangeFolder : sgFolder.listFiles()) { - try { - Long timeRangeId = Long.parseLong(timeRangeFolder.getName()); - currentAllLocalFiles.get(sgFolder.getName()).putIfAbsent(timeRangeId, new HashSet<>()); - File[] files = timeRangeFolder.listFiles(); - Arrays.stream(files) - .forEach(file -> currentAllLocalFiles.get(sgFolder.getName()).get(timeRangeId) - .add(new File(timeRangeFolder.getAbsolutePath(), file.getName()))); - } catch (Exception e) { - LOGGER.error("Invalid time range folder: {}", timeRangeFolder.getAbsolutePath(), e); - } + for (File virtualSgFolder : sgFolder.listFiles()) { + try { + Long vgId = Long.parseLong(virtualSgFolder.getName()); + allSGs.get(sgFolder.getName()).putIfAbsent(vgId, new HashSet<>()); + currentAllLocalFiles.get(sgFolder.getName()).putIfAbsent(vgId, new HashMap<>()); + + for (File timeRangeFolder : virtualSgFolder.listFiles()) { + try { + Long timeRangeId = Long.parseLong(timeRangeFolder.getName()); + currentAllLocalFiles.get(sgFolder.getName()).get(vgId).putIfAbsent(timeRangeId, new HashSet<>()); + File[] files = timeRangeFolder.listFiles(); + Arrays.stream(files) + .forEach(file -> currentAllLocalFiles.get(sgFolder.getName()).get(vgId).get(timeRangeId) + .add(new File(timeRangeFolder.getAbsolutePath(), file.getName()))); + } catch (Exception e) { + LOGGER.error("Invalid time range folder: {}", timeRangeFolder.getAbsolutePath(), e); + } + } + } catch (Exception e) { + LOGGER.error("Invalid virtual storage group folder: {}", virtualSgFolder.getAbsolutePath(), e); + } } } // get sealed tsfiles - for (Entry<String, Map<Long, Set<File>>> entry : currentAllLocalFiles.entrySet()) { + for (Entry<String, Map<Long, Map<Long, Set<File>>>> entry : currentAllLocalFiles.entrySet()) { String sgName = entry.getKey(); currentSealedLocalFilesMap.putIfAbsent(sgName, new HashMap<>()); - for (Entry<Long, Set<File>> innerEntry : entry.getValue().entrySet()) { - Long timeRangeId = innerEntry.getKey(); - currentSealedLocalFilesMap.get(sgName).putIfAbsent(timeRangeId, new HashSet<>()); - for (File file : innerEntry.getValue()) { - if (!file.getName().endsWith(TSFILE_SUFFIX)) { - continue; - } - if (checkFileValidity(file)) { - currentSealedLocalFilesMap.get(sgName).get(timeRangeId).add(file); + for (Entry<Long, Map<Long, Set<File>>> vgEntry : entry.getValue().entrySet()) { + Long vgId = vgEntry.getKey(); + currentSealedLocalFilesMap.get(sgName).putIfAbsent(vgId, new HashMap<>()); + for (Entry<Long, Set<File>> innerEntry : vgEntry.getValue().entrySet()) { + Long timeRangeId = innerEntry.getKey(); + currentSealedLocalFilesMap.get(sgName).get(vgId).putIfAbsent(timeRangeId, new HashSet<>()); + for (File file : innerEntry.getValue()) { + if (!file.getName().endsWith(TSFILE_SUFFIX)) { + continue; + } + if (checkFileValidity(file)) { + currentSealedLocalFilesMap.get(sgName).get(vgId).get(timeRangeId).add(file); + } } } } @@ -157,10 +173,13 @@ public class SyncFileManager implements ISyncFileManager { while ((filePath = reader.readLine()) != null) { File file = new File(filePath); Long timeRangeId = Long.parseLong(file.getParentFile().getName()); - String sgName = file.getParentFile().getParentFile().getName(); - allSGs.putIfAbsent(sgName, new HashSet<>()); + Long vgId = Long.parseLong(file.getParentFile().getParentFile().getName()); + String sgName = file.getParentFile().getParentFile().getParentFile().getName(); + allSGs.putIfAbsent(sgName, new HashMap<>()); + allSGs.get(sgName).putIfAbsent(vgId, new HashSet<>()); lastLocalFilesMap.computeIfAbsent(sgName, k -> new HashMap<>()) - .computeIfAbsent(timeRangeId, k -> new HashSet<>()).add(file); + .computeIfAbsent(vgId, k -> new HashMap<>()) + .computeIfAbsent(timeRangeId, k -> new HashSet<>()).add(file); } } } @@ -177,28 +196,44 @@ public class SyncFileManager implements ISyncFileManager { for (String sgName : allSGs.keySet()) { toBeSyncedFilesMap.putIfAbsent(sgName, new HashMap<>()); deletedFilesMap.putIfAbsent(sgName, new HashMap<>()); - for (Entry<Long, Set<File>> entry : currentSealedLocalFilesMap - .getOrDefault(sgName, Collections.emptyMap()).entrySet()) { - Long timeRangeId = entry.getKey(); - toBeSyncedFilesMap.get(sgName).putIfAbsent(timeRangeId, new HashSet<>()); - allSGs.get(sgName).add(timeRangeId); - for (File newFile : entry.getValue()) { - if (!lastLocalFilesMap.getOrDefault(sgName, Collections.emptyMap()) + + for (Entry<Long, Map<Long, Set<File>>> entry : currentSealedLocalFilesMap + .getOrDefault(sgName, Collections.emptyMap()).entrySet()) { + Long vgId = entry.getKey(); + toBeSyncedFilesMap.get(sgName).putIfAbsent(vgId, new HashMap<>()); + allSGs.get(sgName).putIfAbsent(vgId, new HashSet<>()); + + for (Entry<Long, Set<File>> innerEntry : entry.getValue().entrySet()) { + Long timeRangeId = innerEntry.getKey(); + toBeSyncedFilesMap.get(sgName).get(vgId).putIfAbsent(timeRangeId, new HashSet<>()); + allSGs.get(sgName).get(vgId).add(timeRangeId); + for (File newFile : innerEntry.getValue()) { + if (!lastLocalFilesMap.getOrDefault(sgName, Collections.emptyMap()) + .getOrDefault(vgId, Collections.emptyMap()) .getOrDefault(timeRangeId, Collections.emptySet()).contains(newFile)) { - toBeSyncedFilesMap.get(sgName).get(timeRangeId).add(newFile); + toBeSyncedFilesMap.get(sgName).get(vgId).get(timeRangeId).add(newFile); + } } } } - for (Entry<Long, Set<File>> entry : lastLocalFilesMap + for (Entry<Long, Map<Long, Set<File>>> entry : lastLocalFilesMap .getOrDefault(sgName, Collections.emptyMap()).entrySet()) { - Long timeRangeId = entry.getKey(); - deletedFilesMap.get(sgName).putIfAbsent(timeRangeId, new HashSet<>()); - allSGs.get(sgName).add(timeRangeId); - for (File oldFile : entry.getValue()) { - if (!currentSealedLocalFilesMap.getOrDefault(sgName, Collections.emptyMap()) + Long vgId = entry.getKey(); + deletedFilesMap.get(sgName).putIfAbsent(vgId, new HashMap<>()); + allSGs.get(sgName).putIfAbsent(vgId, new HashSet<>()); + + for (Entry<Long, Set<File>> innerEntry : entry.getValue().entrySet()) { + Long timeRangeId = innerEntry.getKey(); + deletedFilesMap.get(sgName).get(vgId).putIfAbsent(timeRangeId, new HashSet<>()); + allSGs.get(sgName).get(vgId).add(timeRangeId); + + for (File oldFile : innerEntry.getValue()) { + if (!currentSealedLocalFilesMap.getOrDefault(sgName, Collections.emptyMap()) + .getOrDefault(vgId, Collections.emptyMap()) .getOrDefault(timeRangeId, Collections.emptySet()).contains(oldFile)) { - deletedFilesMap.get(sgName).get(timeRangeId).add(oldFile); + deletedFilesMap.get(sgName).get(vgId).get(timeRangeId).add(oldFile); + } } } } @@ -206,27 +241,27 @@ public class SyncFileManager implements ISyncFileManager { } @Override - public Map<String, Map<Long, Set<File>>> getCurrentSealedLocalFilesMap() { + public Map<String, Map<Long, Map<Long, Set<File>>>> getCurrentSealedLocalFilesMap() { return currentSealedLocalFilesMap; } @Override - public Map<String, Map<Long, Set<File>>> getLastLocalFilesMap() { + public Map<String, Map<Long, Map<Long, Set<File>>>> getLastLocalFilesMap() { return lastLocalFilesMap; } @Override - public Map<String, Map<Long, Set<File>>> getDeletedFilesMap() { + public Map<String, Map<Long, Map<Long, Set<File>>>> getDeletedFilesMap() { return deletedFilesMap; } @Override - public Map<String, Map<Long, Set<File>>> getToBeSyncedFilesMap() { + public Map<String, Map<Long, Map<Long, Set<File>>>> getToBeSyncedFilesMap() { return toBeSyncedFilesMap; } @Override - public Map<String, Set<Long>> getAllSGs() { + public Map<String, Map<Long, Set<Long>>> getAllSGs() { return allSGs; } diff --git a/server/src/main/java/org/apache/iotdb/db/sync/sender/transfer/ISyncClient.java b/server/src/main/java/org/apache/iotdb/db/sync/sender/transfer/ISyncClient.java index 15323bc..1d501ca 100644 --- a/server/src/main/java/org/apache/iotdb/db/sync/sender/transfer/ISyncClient.java +++ b/server/src/main/java/org/apache/iotdb/db/sync/sender/transfer/ISyncClient.java @@ -92,10 +92,11 @@ public interface ISyncClient { * receiver. * * @param sgName storage group name + * @param vgId virtual group id * @param timeRangeId id of time range * @param toBeSyncFiles list of new tsfile names */ - void syncDataFilesInOneGroup(String sgName, Long timeRangeId, Set<File> toBeSyncFiles) + void syncDataFilesInOneGroup(String sgName, Long vgId, Long timeRangeId, Set<File> toBeSyncFiles) throws SyncConnectionException, IOException, SyncDeviceOwnerConflictException; /** diff --git a/server/src/main/java/org/apache/iotdb/db/sync/sender/transfer/SyncClient.java b/server/src/main/java/org/apache/iotdb/db/sync/sender/transfer/SyncClient.java index b88c0de..71ede28 100644 --- a/server/src/main/java/org/apache/iotdb/db/sync/sender/transfer/SyncClient.java +++ b/server/src/main/java/org/apache/iotdb/db/sync/sender/transfer/SyncClient.java @@ -105,13 +105,13 @@ public class SyncClient implements ISyncClient { private SyncService.Client serviceClient; - private Map<String, Set<Long>> allSG; + private Map<String, Map<Long, Set<Long>>> allSG; - private Map<String, Map<Long, Set<File>>> toBeSyncedFilesMap; + private Map<String, Map<Long, Map<Long, Set<File>>>> toBeSyncedFilesMap; - private Map<String, Map<Long, Set<File>>> deletedFilesMap; + private Map<String, Map<Long, Map<Long, Set<File>>>> deletedFilesMap; - private Map<String, Map<Long, Set<File>>> lastLocalFilesMap; + private Map<String, Map<Long, Map<Long, Set<File>>>> lastLocalFilesMap; /** * If true, sync is in execution. @@ -454,7 +454,7 @@ public class SyncClient implements ISyncClient { syncStatus = true; List<String> storageGroups = config.getStorageGroupList(); - for (Entry<String, Set<Long>> entry : allSG.entrySet()) { + for (Entry<String, Map<Long, Set<Long>>> entry : allSG.entrySet()) { String sgName = entry.getKey(); if (!storageGroups.isEmpty() && !storageGroups.contains(sgName)) { continue; @@ -470,17 +470,22 @@ public class SyncClient implements ISyncClient { throw new SyncConnectionException("Unable to connect to receiver", e); } logger.info( - "Sync process starts to transfer data of storage group {}, it has {} time ranges.", + "Sync process starts to transfer data of storage group {}, it has {} virtual storage group.", sgName, entry.getValue().size()); try { - for (Long timeRangeId : entry.getValue()) { - lastLocalFilesMap.get(sgName).putIfAbsent(timeRangeId, new HashSet<>()); - syncDeletedFilesNameInOneGroup(sgName, timeRangeId, + for (Entry<Long, Set<Long>> vgEntry : entry.getValue().entrySet()) { + lastLocalFilesMap.get(sgName).putIfAbsent(vgEntry.getKey(), new HashMap<>()); + for (Long timeRangeId : vgEntry.getValue()) { + lastLocalFilesMap.get(sgName).get(vgEntry.getKey()).putIfAbsent(timeRangeId, new HashSet<>()); + syncDeletedFilesNameInOneGroup(sgName, timeRangeId, deletedFilesMap.getOrDefault(sgName, Collections.emptyMap()) - .getOrDefault(timeRangeId, Collections.emptySet())); - syncDataFilesInOneGroup(sgName, timeRangeId, + .getOrDefault(vgEntry.getKey(), Collections.emptyMap()) + .getOrDefault(timeRangeId, Collections.emptySet())); + syncDataFilesInOneGroup(sgName, vgEntry.getKey(), timeRangeId, toBeSyncedFilesMap.getOrDefault(sgName, Collections.emptyMap()) - .getOrDefault(timeRangeId, Collections.emptySet())); + .getOrDefault(vgEntry.getKey(), Collections.emptyMap()) + .getOrDefault(timeRangeId, Collections.emptySet())); + } } } catch (SyncDeviceOwnerConflictException e) { deletedFilesMap.remove(sgName); @@ -528,7 +533,7 @@ public class SyncClient implements ISyncClient { } @Override - public void syncDataFilesInOneGroup(String sgName, Long timeRangeId, Set<File> toBeSyncFiles) + public void syncDataFilesInOneGroup(String sgName, Long vgId, Long timeRangeId, Set<File> toBeSyncFiles) throws SyncConnectionException, IOException, SyncDeviceOwnerConflictException { if (toBeSyncFiles.isEmpty()) { logger.info("There has no new tsfiles to be synced in storage group {}", sgName); @@ -544,7 +549,7 @@ public class SyncClient implements ISyncClient { // firstly sync .resource file, then sync tsfile syncSingleFile(new File(snapshotFile.getAbsolutePath() + TsFileResource.RESOURCE_SUFFIX)); syncSingleFile(snapshotFile); - lastLocalFilesMap.get(sgName).get(timeRangeId).add(tsfile); + lastLocalFilesMap.get(sgName).get(vgId).get(timeRangeId).add(tsfile); syncLog.finishSyncTsfile(tsfile); logger.info("Task of synchronization has completed {}/{}.", cnt, toBeSyncFiles.size()); } catch (IOException e) { @@ -638,13 +643,15 @@ public class SyncClient implements ISyncClient { // 1. Write file list to currentLocalFile try (BufferedWriter bw = new BufferedWriter(new FileWriter(currentLocalFile))) { - for (Map<Long, Set<File>> currentLocalFiles : lastLocalFilesMap.values()) { - for (Set<File> files : currentLocalFiles.values()) { - for (File file : files) { - bw.write(file.getAbsolutePath()); - bw.newLine(); + for (Map<Long, Map<Long, Set<File>>> vgCurrentLocalFiles : lastLocalFilesMap.values()) { + for (Map<Long, Set<File>> currentLocalFiles : vgCurrentLocalFiles.values()) { + for (Set<File> files : currentLocalFiles.values()) { + for (File file : files) { + bw.write(file.getAbsolutePath()); + bw.newLine(); + } + bw.flush(); } - bw.flush(); } } } catch (IOException e) { diff --git a/server/src/main/java/org/apache/iotdb/db/utils/SyncUtils.java b/server/src/main/java/org/apache/iotdb/db/utils/SyncUtils.java index 2401a01..e5446a0 100644 --- a/server/src/main/java/org/apache/iotdb/db/utils/SyncUtils.java +++ b/server/src/main/java/org/apache/iotdb/db/utils/SyncUtils.java @@ -49,11 +49,13 @@ public class SyncUtils { /** * Verify sending list is empty or not It's used by sync sender. */ - public static boolean isEmpty(Map<String, Map<Long, Set<File>>> sendingFileList) { - for (Entry<String, Map<Long, Set<File>>> entry: sendingFileList.entrySet()) { - for(Entry<Long, Set<File>> innerEntry: entry.getValue().entrySet()) { - if (!innerEntry.getValue().isEmpty()) { - return false; + public static boolean isEmpty(Map<String, Map<Long, Map<Long, Set<File>>>> sendingFileList) { + for (Entry<String, Map<Long, Map<Long, Set<File>>>> entry: sendingFileList.entrySet()) { + for (Entry<Long, Map<Long, Set<File>>> vgEntry : entry.getValue().entrySet()) { + for (Entry<Long, Set<File>> innerEntry : vgEntry.getValue().entrySet()) { + if (!innerEntry.getValue().isEmpty()) { + return false; + } } } } diff --git a/server/src/test/java/org/apache/iotdb/db/sync/sender/manage/SyncFileManagerTest.java b/server/src/test/java/org/apache/iotdb/db/sync/sender/manage/SyncFileManagerTest.java index e742d57..c82f346 100644 --- a/server/src/test/java/org/apache/iotdb/db/sync/sender/manage/SyncFileManagerTest.java +++ b/server/src/test/java/org/apache/iotdb/db/sync/sender/manage/SyncFileManagerTest.java @@ -71,7 +71,7 @@ public class SyncFileManagerTest { @Test public void testGetValidFiles() throws IOException, MetadataException { - Map<String, Map<Long, Set<File>>> allFileList = new HashMap<>(); + Map<String, Map<Long, Map<Long, Set<File>>>> allFileList = new HashMap<>(); Random r = new Random(0); for (int i = 0; i < 3; i++) { @@ -80,12 +80,13 @@ public class SyncFileManagerTest { for (int i = 0; i < 3; i++) { for (int j = 0; j < 5; j++) { allFileList.computeIfAbsent(getSgName(i), k -> new HashMap<>()) + .computeIfAbsent(0L, k -> new HashMap<>()) .computeIfAbsent(0L, k -> new HashSet<>()); String rand = r.nextInt(10000) + TSFILE_SUFFIX; String fileName = FilePathUtils.regularizePath(dataDir) + IoTDBConstant.SEQUENCE_FLODER_NAME - + File.separator + getSgName(i) + File.separator + "0" + File.separator + rand; + + File.separator + getSgName(i) + File.separator + "0" + File.separator + "0" + File.separator + rand; File file = new File(fileName); - allFileList.get(getSgName(i)).get(0L).add(file); + allFileList.get(getSgName(i)).get(0L).get(0L).add(file); if (!file.getParentFile().exists()) { file.getParentFile().mkdirs(); } @@ -106,25 +107,27 @@ public class SyncFileManagerTest { updateLastLocalFiles(allFileList); manager.getValidFiles(dataDir); - Map<String, Map<Long, Set<File>>> lastFileMap = manager.getLastLocalFilesMap(); + Map<String, Map<Long, Map<Long, Set<File>>>> lastFileMap = manager.getLastLocalFilesMap(); assertFileMap(allFileList, lastFileMap); // add some files - Map<String, Map<Long, Set<File>>> correctToBeSyncedFiles = new HashMap<>(); + Map<String, Map<Long, Map<Long, Set<File>>>> correctToBeSyncedFiles = new HashMap<>(); r = new Random(1); for (int i = 0; i < 3; i++) { for (int j = 0; j < 5; j++) { allFileList.computeIfAbsent(getSgName(i), k -> new HashMap<>()) + .computeIfAbsent(0L, k -> new HashMap<>()) .computeIfAbsent(0L, k -> new HashSet<>()); correctToBeSyncedFiles.computeIfAbsent(getSgName(i), k -> new HashMap<>()) + .computeIfAbsent(0L, k -> new HashMap<>()) .computeIfAbsent(0L, k -> new HashSet<>()); String rand = r.nextInt(10000) + TSFILE_SUFFIX; String fileName = FilePathUtils.regularizePath(dataDir) + IoTDBConstant.SEQUENCE_FLODER_NAME - + File.separator + getSgName(i) + File.separator + "0" + File.separator + rand; + + File.separator + getSgName(i) + File.separator + "0" + File.separator + "0" + File.separator + rand; File file = new File(fileName); - allFileList.get(getSgName(i)).get(0L).add(file); - correctToBeSyncedFiles.get(getSgName(i)).get(0L).add(file); + allFileList.get(getSgName(i)).get(0L).get(0L).add(file); + correctToBeSyncedFiles.get(getSgName(i)).get(0L).get(0L).add(file); if (!file.getParentFile().exists()) { file.getParentFile().mkdirs(); } @@ -138,8 +141,8 @@ public class SyncFileManagerTest { } } manager.getValidFiles(dataDir); - Map<String, Map<Long, Set<File>>> curFileMap = manager.getCurrentSealedLocalFilesMap(); - Map<String, Map<Long, Set<File>>> toBeSyncedFilesMap = manager.getToBeSyncedFilesMap(); + Map<String, Map<Long, Map<Long, Set<File>>>> curFileMap = manager.getCurrentSealedLocalFilesMap(); + Map<String, Map<Long, Map<Long, Set<File>>>> toBeSyncedFilesMap = manager.getToBeSyncedFilesMap(); assertFileMap(allFileList, curFileMap); assertFileMap(correctToBeSyncedFiles, toBeSyncedFilesMap); @@ -155,17 +158,19 @@ public class SyncFileManagerTest { for (int i = 0; i < 3; i++) { for (int j = 0; j < 5; j++) { allFileList.computeIfAbsent(getSgName(i), k -> new HashMap<>()) - .computeIfAbsent(0L, k -> new HashSet<>()); + .computeIfAbsent(0L, k -> new HashMap<>()) + .computeIfAbsent(0L, k -> new HashSet<>()); correctToBeSyncedFiles.computeIfAbsent(getSgName(i), k -> new HashMap<>()) - .computeIfAbsent(0L, k -> new HashSet<>()); + .computeIfAbsent(0L, k -> new HashMap<>()) + .computeIfAbsent(0L, k -> new HashSet<>()); String rand = r.nextInt(10000) + TSFILE_SUFFIX; String fileName = FilePathUtils.regularizePath(dataDir) + IoTDBConstant.SEQUENCE_FLODER_NAME - + File.separator + getSgName(i) + File.separator + "0" + File.separator + + File.separator + getSgName(i) + File.separator + "0" + File.separator + "0" + File.separator + File.separator + rand; File file = new File(fileName); - allFileList.get(getSgName(i)).get(0L).add(file); - correctToBeSyncedFiles.get(getSgName(i)).get(0L).add(file); + allFileList.get(getSgName(i)).get(0L).get(0L).add(file); + correctToBeSyncedFiles.get(getSgName(i)).get(0L).get(0L).add(file); if (!file.getParentFile().exists()) { file.getParentFile().mkdirs(); } @@ -179,35 +184,41 @@ public class SyncFileManagerTest { } } int count = 0; - Map<String, Map<Long, Set<File>>> correctDeleteFile = new HashMap<>(); - for (Entry<String, Map<Long, Set<File>>> entry : allFileList.entrySet()) { + Map<String, Map<Long, Map<Long, Set<File>>>> correctDeleteFile = new HashMap<>(); + for (Entry<String, Map<Long, Map<Long, Set<File>>>> entry : allFileList.entrySet()) { correctDeleteFile.put(entry.getKey(), new HashMap<>()); - for (Entry<Long, Set<File>> innerEntry : entry.getValue().entrySet()) { - Set<File> files = innerEntry.getValue(); - correctDeleteFile.get(entry.getKey()).putIfAbsent(innerEntry.getKey(), new HashSet<>()); - for (File file : files) { - count++; - if (count % 3 == 0 && lastFileMap.get(entry.getKey()).get(0L).contains(file)) { - correctDeleteFile.get(entry.getKey()).get(0L).add(file); + for (Entry<Long, Map<Long, Set<File>>> vgEntry : entry.getValue().entrySet()) { + correctDeleteFile.get(entry.getKey()).putIfAbsent(vgEntry.getKey(), new HashMap<>()); + for (Entry<Long, Set<File>> innerEntry : vgEntry.getValue().entrySet()) { + Set<File> files = innerEntry.getValue(); + correctDeleteFile.get(entry.getKey()).get(vgEntry.getKey()).putIfAbsent(innerEntry.getKey(), new HashSet<>()); + for (File file : files) { + count++; + if (count % 3 == 0 && lastFileMap.get(entry.getKey()).get(0L).get(0L).contains(file)) { + correctDeleteFile.get(entry.getKey()).get(0L).get(0L).add(file); + } } } } } - for (Entry<String, Map<Long, Set<File>>> entry : correctDeleteFile.entrySet()) { + for (Entry<String, Map<Long, Map<Long, Set<File>>>> entry : correctDeleteFile.entrySet()) { correctDeleteFile.put(entry.getKey(), new HashMap<>()); - for (Entry<Long, Set<File>> innerEntry : entry.getValue().entrySet()) { - Set<File> files = innerEntry.getValue(); - correctDeleteFile.get(entry.getKey()).putIfAbsent(innerEntry.getKey(), new HashSet<>()); - for (File file : innerEntry.getValue()) { - file.delete(); - new File(file.getAbsolutePath() + TsFileResource.RESOURCE_SUFFIX).delete(); - allFileList.get(entry.getKey()).get(0L).remove(file); + for (Entry<Long, Map<Long, Set<File>>> vgEntry : entry.getValue().entrySet()) { + correctDeleteFile.get(entry.getKey()).putIfAbsent(vgEntry.getKey(), new HashMap<>()); + for (Entry<Long, Set<File>> innerEntry : vgEntry.getValue().entrySet()) { + Set<File> files = innerEntry.getValue(); + correctDeleteFile.get(entry.getKey()).get(vgEntry.getKey()).putIfAbsent(innerEntry.getKey(), new HashSet<>()); + for (File file : innerEntry.getValue()) { + file.delete(); + new File(file.getAbsolutePath() + TsFileResource.RESOURCE_SUFFIX).delete(); + allFileList.get(entry.getKey()).get(0L).get(0L).remove(file); + } } } } manager.getValidFiles(dataDir); curFileMap = manager.getCurrentSealedLocalFilesMap(); - Map<String, Map<Long, Set<File>>> deletedFilesMap = manager.getDeletedFilesMap(); + Map<String, Map<Long, Map<Long, Set<File>>>> deletedFilesMap = manager.getDeletedFilesMap(); toBeSyncedFilesMap = manager.getToBeSyncedFilesMap(); assertFileMap(allFileList, curFileMap); assertFileMap(correctDeleteFile, deletedFilesMap); @@ -218,14 +229,15 @@ public class SyncFileManagerTest { for (int i = 0; i < 3; i++) { for (int j = 0; j < 5; j++) { allFileList.computeIfAbsent(getSgName(i), k -> new HashMap<>()) - .computeIfAbsent(0L, k -> new HashSet<>()); + .computeIfAbsent(0L, k -> new HashMap<>()) + .computeIfAbsent(0L, k -> new HashSet<>()); String rand = String.valueOf(r.nextInt(10000)); String fileName = FilePathUtils.regularizePath(dataDir) + IoTDBConstant.SEQUENCE_FLODER_NAME - + File.separator + getSgName(i) + File.separator + "0" + File.separator + + File.separator + getSgName(i) + File.separator + "0" + File.separator + "0" + File.separator + File.separator + rand; File file = new File(fileName); - allFileList.get(getSgName(i)).get(0L).add(file); + allFileList.get(getSgName(i)).get(0L).get(0L).add(file); if (!file.getParentFile().exists()) { file.getParentFile().mkdirs(); } @@ -245,13 +257,17 @@ public class SyncFileManagerTest { assertFileMap(correctToBeSyncedFiles, toBeSyncedFilesMap); } - private void assertFileMap(Map<String, Map<Long, Set<File>>> correctMap, - Map<String, Map<Long, Set<File>>> curMap) { - for (Entry<String, Map<Long, Set<File>>> entry : correctMap.entrySet()) { + private void assertFileMap(Map<String, Map<Long, Map<Long, Set<File>>>> correctMap, + Map<String, Map<Long, Map<Long, Set<File>>>> curMap) { + for (Entry<String, Map<Long, Map<Long, Set<File>>>> entry : correctMap.entrySet()) { assertTrue(curMap.containsKey(entry.getKey())); - for (Entry<Long, Set<File>> innerEntry : entry.getValue().entrySet()) { - assertTrue( - curMap.get(entry.getKey()).get(innerEntry.getKey()).containsAll(innerEntry.getValue())); + for (Entry<Long, Map<Long, Set<File>>> innerEntry : entry.getValue().entrySet()) { + assertTrue(curMap.get(entry.getKey()).containsKey(innerEntry.getKey())); + for (Entry<Long, Set<File>> fileEntry : innerEntry.getValue().entrySet()) { + assertTrue( + curMap.get(entry.getKey()).get(innerEntry.getKey()) + .get(fileEntry.getKey()).containsAll(fileEntry.getValue())); + } } } } @@ -260,16 +276,18 @@ public class SyncFileManagerTest { return IoTDBConstant.PATH_ROOT + IoTDBConstant.PATH_SEPARATOR + i; } - private void updateLastLocalFiles(Map<String, Map<Long, Set<File>>> lastLocalFilesMap) { + private void updateLastLocalFiles(Map<String, Map<Long, Map<Long, Set<File>>>> lastLocalFilesMap) { try (BufferedWriter bw = new BufferedWriter( new FileWriter(new File(config.getLastFileInfoPath())))) { - for (Map<Long, Set<File>> currentLocalFiles : lastLocalFilesMap.values()) { - for (Set<File> files : currentLocalFiles.values()) { - for (File file : files) { - bw.write(file.getAbsolutePath()); - bw.newLine(); + for (Map<Long, Map<Long, Set<File>>> currentLocalFiles : lastLocalFilesMap.values()) { + for (Map<Long, Set<File>> vgFiles : currentLocalFiles.values()) { + for (Set<File> files : vgFiles.values()) { + for (File file : files) { + bw.write(file.getAbsolutePath()); + bw.newLine(); + } + bw.flush(); } - bw.flush(); } } } catch (IOException e) { diff --git a/server/src/test/java/org/apache/iotdb/db/sync/sender/recover/SyncSenderLogAnalyzerTest.java b/server/src/test/java/org/apache/iotdb/db/sync/sender/recover/SyncSenderLogAnalyzerTest.java index 4609fb5..453e022 100644 --- a/server/src/test/java/org/apache/iotdb/db/sync/sender/recover/SyncSenderLogAnalyzerTest.java +++ b/server/src/test/java/org/apache/iotdb/db/sync/sender/recover/SyncSenderLogAnalyzerTest.java @@ -48,6 +48,7 @@ import java.util.*; import java.util.Map.Entry; import static org.apache.iotdb.tsfile.common.constant.TsFileConstant.TSFILE_SUFFIX; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; @@ -79,7 +80,7 @@ public class SyncSenderLogAnalyzerTest { @Test public void recover() throws IOException, MetadataException { - Map<String, Map<Long, Set<File>>> allFileList = new HashMap<>(); + Map<String, Map<Long, Map<Long, Set<File>>>> allFileList = new HashMap<>(); for (int i = 0; i < 3; i++) { IoTDB.metaManager.setStorageGroup(new PartialPath(getSgName(i))); @@ -87,12 +88,14 @@ public class SyncSenderLogAnalyzerTest { Random r = new Random(0); for (int i = 0; i < 3; i++) { for (int j = 0; j < 5; j++) { - allFileList.computeIfAbsent(getSgName(i), k -> new HashMap<>()).computeIfAbsent(0L, k -> new HashSet<>()); + allFileList.computeIfAbsent(getSgName(i), k -> new HashMap<>()) + .computeIfAbsent(0L, k -> new HashMap<>()) + .computeIfAbsent(0L, k -> new HashSet<>()); String rand = r.nextInt(10000) + TSFILE_SUFFIX; String fileName = FilePathUtils.regularizePath(dataDir) + IoTDBConstant.SEQUENCE_FLODER_NAME - + File.separator + getSgName(i) + File.separator + "0" + File.separator + rand; + + File.separator + getSgName(i) + File.separator + "0" + File.separator + "0" + File.separator + rand; File file = new File(fileName); - allFileList.get(getSgName(i)).get(0L).add(file); + allFileList.get(getSgName(i)).get(0L).get(0L).add(file); if (!file.getParentFile().exists()) { file.getParentFile().mkdirs(); } @@ -108,10 +111,12 @@ public class SyncSenderLogAnalyzerTest { manager.getValidFiles(dataDir); assertTrue(SyncUtils.isEmpty(manager.getLastLocalFilesMap())); senderLogger.startSyncTsFiles(); - for (Map<Long, Set<File>> map : allFileList.values()) { - for (Set<File> newTsFiles : map.values()) { - for (File file : newTsFiles) { - senderLogger.finishSyncTsfile(file); + for (Map<Long, Map<Long, Set<File>>> map : allFileList.values()) { + for (Map<Long, Set<File>> vgMap : map.values()) { + for (Set<File> newTsFiles : vgMap.values()) { + for (File file : newTsFiles) { + senderLogger.finishSyncTsfile(file); + } } } } @@ -121,7 +126,7 @@ public class SyncSenderLogAnalyzerTest { senderLogAnalyzer.recover(); manager.getValidFiles(dataDir); assertFalse(SyncUtils.isEmpty(manager.getLastLocalFilesMap())); - Map<String, Map<Long, Set<File>>> lastFilesMap = manager.getLastLocalFilesMap(); + Map<String, Map<Long, Map<Long, Set<File>>>> lastFilesMap = manager.getLastLocalFilesMap(); assertFileMap(allFileList, lastFilesMap); // delete some files @@ -131,10 +136,12 @@ public class SyncSenderLogAnalyzerTest { manager.getValidFiles(dataDir); assertFalse(SyncUtils.isEmpty(manager.getLastLocalFilesMap())); senderLogger.startSyncDeletedFilesName(); - for (Map<Long, Set<File>> map : allFileList.values()) { - for (Set<File> newTsFiles : map.values()) { - for (File file : newTsFiles) { - senderLogger.finishSyncDeletedFileName(file); + for (Map<Long, Map<Long, Set<File>>> map : allFileList.values()) { + for (Map<Long, Set<File>> vgMap : map.values()) { + for (Set<File> newTsFiles : vgMap.values()) { + for (File file : newTsFiles) { + senderLogger.finishSyncDeletedFileName(file); + } } } } @@ -144,17 +151,21 @@ public class SyncSenderLogAnalyzerTest { manager.getValidFiles(dataDir); assertTrue(SyncUtils.isEmpty(manager.getLastLocalFilesMap())); assertTrue(SyncUtils.isEmpty(manager.getDeletedFilesMap())); - Map<String, Map<Long, Set<File>>> toBeSyncedFilesMap = manager.getToBeSyncedFilesMap(); + Map<String, Map<Long, Map<Long, Set<File>>>> toBeSyncedFilesMap = manager.getToBeSyncedFilesMap(); assertFileMap(allFileList, toBeSyncedFilesMap); } - private void assertFileMap(Map<String, Map<Long, Set<File>>> correctMap, - Map<String, Map<Long, Set<File>>> curMap) { - for (Entry<String, Map<Long, Set<File>>> entry : correctMap.entrySet()) { + private void assertFileMap(Map<String, Map<Long, Map<Long, Set<File>>>> correctMap, + Map<String, Map<Long, Map<Long, Set<File>>>> curMap) { + for (Entry<String, Map<Long, Map<Long, Set<File>>>> entry : correctMap.entrySet()) { assertTrue(curMap.containsKey(entry.getKey())); - for (Entry<Long, Set<File>> innerEntry : entry.getValue().entrySet()) { - assertTrue( - curMap.get(entry.getKey()).get(innerEntry.getKey()).containsAll(innerEntry.getValue())); + for (Entry<Long, Map<Long, Set<File>>> vgEntry : entry.getValue().entrySet()) { + assertTrue(curMap.get(entry.getKey()).containsKey(vgEntry.getKey())); + for (Entry<Long, Set<File>> innerEntry : vgEntry.getValue().entrySet()) { + assertTrue( + curMap.get(entry.getKey()).get(vgEntry.getKey()) + .get(innerEntry.getKey()).containsAll(innerEntry.getValue())); + } } } }
