This is an automated email from the ASF dual-hosted git repository. lta pushed a commit to branch reimpl_sync in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
commit e655ae34915b9df6e0a96689532a061a6ee8b120 Author: lta <[email protected]> AuthorDate: Tue Sep 3 16:13:30 2019 +0800 add class comments --- .../resources/conf/iotdb-sync-client.properties | 5 ++ .../iotdb/db/sync/receiver/SyncServerManager.java | 6 ++- .../iotdb/db/sync/receiver/load/FileLoader.java | 10 ++-- .../db/sync/receiver/load/FileLoaderManager.java | 4 +- .../iotdb/db/sync/receiver/load/IFileLoader.java | 19 +++++++ .../iotdb/db/sync/receiver/load/ILoadLogger.java | 23 ++++++++ .../sync/receiver/recover/ISyncReceiverLogger.java | 12 ++--- .../receiver/recover/SyncReceiverLogAnalyzer.java | 18 +++---- .../db/sync/receiver/transfer/SyncServiceImpl.java | 13 ++--- .../conf/{Constans.java => SyncConstant.java} | 8 +-- .../db/sync/sender/conf/SyncSenderConfig.java | 41 +++++++++----- .../db/sync/sender/conf/SyncSenderDescriptor.java | 20 +++++-- .../db/sync/sender/manage/ISyncFileManager.java | 8 +-- .../db/sync/sender/manage/SyncFileManager.java | 28 ++++++---- .../sender/recover/ISyncSenderLogAnalyzer.java | 13 +++++ .../db/sync/sender/recover/ISyncSenderLogger.java | 4 +- .../sync/sender/recover/SyncSenderLogAnalyzer.java | 12 ++--- .../sync/sender/transfer/DataTransferManager.java | 63 ++++++++++------------ .../db/sync/receiver/load/FileLoaderTest.java | 12 ++--- .../recover/SyncReceiverLogAnalyzerTest.java | 18 +++---- .../receiver/recover/SyncReceiverLoggerTest.java | 8 +-- .../db/sync/sender/manage/SyncFileManagerTest.java | 4 +- .../sender/recover/SyncSenderLogAnalyzerTest.java | 8 +-- .../sync/sender/recover/SyncSenderLoggerTest.java | 6 +-- .../sender/transfer/DataTransferManagerTest.java | 4 +- 25 files changed, 228 insertions(+), 139 deletions(-) diff --git a/server/src/assembly/resources/conf/iotdb-sync-client.properties b/server/src/assembly/resources/conf/iotdb-sync-client.properties index 3606ab4..786637a 100644 --- a/server/src/assembly/resources/conf/iotdb-sync-client.properties +++ b/server/src/assembly/resources/conf/iotdb-sync-client.properties @@ -25,3 +25,8 @@ server_port=5555 # The period time of sync process, the time unit is second. sync_period_in_second=600 + +# This parameter represents storage groups that participate in the synchronization task, which distinguishes each storage group by comma. +# If the list is empty, it means that all storage groups participate in synchronization. +# By default, it is empty list. +sync_storage_groups = root.sg1, root.sg2 diff --git a/server/src/main/java/org/apache/iotdb/db/sync/receiver/SyncServerManager.java b/server/src/main/java/org/apache/iotdb/db/sync/receiver/SyncServerManager.java index c6e06cd..c9efe93 100644 --- a/server/src/main/java/org/apache/iotdb/db/sync/receiver/SyncServerManager.java +++ b/server/src/main/java/org/apache/iotdb/db/sync/receiver/SyncServerManager.java @@ -50,13 +50,15 @@ import org.slf4j.LoggerFactory; public class SyncServerManager implements IService { private static final Logger logger = LoggerFactory.getLogger(SyncServerManager.class); - private Thread syncServerThread; + private IoTDBConfig conf = IoTDBDescriptor.getInstance().getConfig(); + private Thread syncServerThread; + private SyncServerManager() { } - public static final SyncServerManager getInstance() { + public static SyncServerManager getInstance() { return ServerManagerHolder.INSTANCE; } diff --git a/server/src/main/java/org/apache/iotdb/db/sync/receiver/load/FileLoader.java b/server/src/main/java/org/apache/iotdb/db/sync/receiver/load/FileLoader.java index 520b42d..2fa47d9 100644 --- a/server/src/main/java/org/apache/iotdb/db/sync/receiver/load/FileLoader.java +++ b/server/src/main/java/org/apache/iotdb/db/sync/receiver/load/FileLoader.java @@ -24,7 +24,7 @@ import org.apache.iotdb.db.engine.StorageEngine; import org.apache.iotdb.db.engine.storagegroup.TsFileResource; import org.apache.iotdb.db.exception.StorageEngineException; import org.apache.iotdb.db.exception.TsFileProcessorException; -import org.apache.iotdb.db.sync.sender.conf.Constans; +import org.apache.iotdb.db.sync.sender.conf.SyncConstant; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -49,7 +49,7 @@ public class FileLoader implements IFileLoader { private FileLoader(String senderName, String syncFolderPath) throws IOException { this.senderName = senderName; this.syncFolderPath = syncFolderPath; - this.loadLog = new LoadLogger(new File(syncFolderPath, Constans.LOAD_LOG_NAME)); + this.loadLog = new LoadLogger(new File(syncFolderPath, SyncConstant.LOAD_LOG_NAME)); FileLoaderManager.getInstance().addFileLoader(senderName, this); FileLoaderManager.getInstance().addLoadTaskRunner(loadTaskRunner); } @@ -151,9 +151,9 @@ public class FileLoader implements IFileLoader { public void cleanUp() { try { loadLog.close(); - new File(syncFolderPath, Constans.SYNC_LOG_NAME).delete(); - new File(syncFolderPath, Constans.LOAD_LOG_NAME).delete(); - FileUtils.deleteDirectory(new File(syncFolderPath, Constans.RECEIVER_DATA_FOLDER_NAME)); + new File(syncFolderPath, SyncConstant.SYNC_LOG_NAME).delete(); + new File(syncFolderPath, SyncConstant.LOAD_LOG_NAME).delete(); + FileUtils.deleteDirectory(new File(syncFolderPath, SyncConstant.RECEIVER_DATA_FOLDER_NAME)); FileLoaderManager.getInstance().removeFileLoader(senderName); } catch (IOException e) { LOGGER.error("Can not clean up sync resource.", e); diff --git a/server/src/main/java/org/apache/iotdb/db/sync/receiver/load/FileLoaderManager.java b/server/src/main/java/org/apache/iotdb/db/sync/receiver/load/FileLoaderManager.java index 55187d3..bf57c7c 100644 --- a/server/src/main/java/org/apache/iotdb/db/sync/receiver/load/FileLoaderManager.java +++ b/server/src/main/java/org/apache/iotdb/db/sync/receiver/load/FileLoaderManager.java @@ -27,6 +27,9 @@ import org.apache.iotdb.db.concurrent.ThreadName; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +/** + * This class is to manage all FileLoader. + */ public class FileLoaderManager { private static final Logger LOGGER = LoggerFactory.getLogger(FileLoaderManager.class); @@ -37,7 +40,6 @@ public class FileLoaderManager { private ExecutorService loadTaskRunnerPool; - private FileLoaderManager() { } diff --git a/server/src/main/java/org/apache/iotdb/db/sync/receiver/load/IFileLoader.java b/server/src/main/java/org/apache/iotdb/db/sync/receiver/load/IFileLoader.java index 69f6fda..9d6d1e1 100644 --- a/server/src/main/java/org/apache/iotdb/db/sync/receiver/load/IFileLoader.java +++ b/server/src/main/java/org/apache/iotdb/db/sync/receiver/load/IFileLoader.java @@ -22,14 +22,33 @@ import java.io.File; import java.io.IOException; import org.apache.iotdb.db.sync.receiver.load.FileLoader.LoadTask; +/** + * This interface is used to load files, including deleted files and new tsfiles. The + * producer-consumer model is used to load files. A background consumer thread is used to load + * files. There is a queue recording task. After receiving a file, the receiver adds a task to the + * queue. When all files are loaded and the synchronization task is completed, the thread is + * closed. + */ public interface IFileLoader { + /** + * Add a deleted file name to be loaded. + */ void addDeletedFileName(File deletedFile); + /** + * Add a new tsfile to be loaded. + */ void addTsfile(File tsfile); + /** + * Mark sync end. + */ void endSync(); + /** + * Handle load task by type. + */ void handleLoadTask(LoadTask task) throws IOException; void cleanUp(); diff --git a/server/src/main/java/org/apache/iotdb/db/sync/receiver/load/ILoadLogger.java b/server/src/main/java/org/apache/iotdb/db/sync/receiver/load/ILoadLogger.java index 15fd8f5..ece5a07 100644 --- a/server/src/main/java/org/apache/iotdb/db/sync/receiver/load/ILoadLogger.java +++ b/server/src/main/java/org/apache/iotdb/db/sync/receiver/load/ILoadLogger.java @@ -21,14 +21,37 @@ package org.apache.iotdb.db.sync.receiver.load; import java.io.File; import java.io.IOException; +/** + * This interface is used to log progress in the process of loading deleted files and new tsfiles. + * If the loading tasks are completed normally and there are no exceptions, the log records will be + * deleted; otherwise, the status can be restored according to the log at the start of each task. It + * ensures the correctness of synchronization module when system crash or network abnormality + * occur. + */ public interface ILoadLogger { + /** + * Start to load deleted files. + */ void startLoadDeletedFiles() throws IOException; + /** + * After a deleted file is loaded, record it in load log. + * + * @param file deleted file to be loaded + */ void finishLoadDeletedFile(File file) throws IOException; + /** + * Start to load tsfiles + */ void startLoadTsFiles() throws IOException; + /** + * After a new tsfile is loaded, record it in load log. + * + * @param file new tsfile to be loaded + */ void finishLoadTsfile(File file) throws IOException; void close() throws IOException; diff --git a/server/src/main/java/org/apache/iotdb/db/sync/receiver/recover/ISyncReceiverLogger.java b/server/src/main/java/org/apache/iotdb/db/sync/receiver/recover/ISyncReceiverLogger.java index 8bfa200..125fbc0 100644 --- a/server/src/main/java/org/apache/iotdb/db/sync/receiver/recover/ISyncReceiverLogger.java +++ b/server/src/main/java/org/apache/iotdb/db/sync/receiver/recover/ISyncReceiverLogger.java @@ -22,30 +22,28 @@ import java.io.File; import java.io.IOException; public interface ISyncReceiverLogger { + /** - * Start sync deleted files name - * @throws IOException + * Start to sync deleted files name */ void startSyncDeletedFilesName() throws IOException; /** * After a deleted file name is synced to the receiver end, record it in sync log. + * * @param file the deleted tsfile - * @throws IOException */ void finishSyncDeletedFileName(File file) throws IOException; /** - * Start sync new tsfiles - * @throws IOException + * Start to sync new tsfiles */ void startSyncTsFiles() throws IOException; /** - * * After a new tsfile is synced to the receiver end, record it in sync log. + * * @param file new tsfile - * @throws IOException */ void finishSyncTsfile(File file) throws IOException; diff --git a/server/src/main/java/org/apache/iotdb/db/sync/receiver/recover/SyncReceiverLogAnalyzer.java b/server/src/main/java/org/apache/iotdb/db/sync/receiver/recover/SyncReceiverLogAnalyzer.java index f2516c4..375c8a1 100644 --- a/server/src/main/java/org/apache/iotdb/db/sync/receiver/recover/SyncReceiverLogAnalyzer.java +++ b/server/src/main/java/org/apache/iotdb/db/sync/receiver/recover/SyncReceiverLogAnalyzer.java @@ -27,7 +27,7 @@ import org.apache.iotdb.db.sync.receiver.load.FileLoader; import org.apache.iotdb.db.sync.receiver.load.FileLoaderManager; import org.apache.iotdb.db.sync.receiver.load.LoadLogger; import org.apache.iotdb.db.sync.receiver.load.LoadType; -import org.apache.iotdb.db.sync.sender.conf.Constans; +import org.apache.iotdb.db.sync.sender.conf.SyncConstant; import org.apache.iotdb.db.utils.FilePathUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -49,11 +49,11 @@ public class SyncReceiverLogAnalyzer implements ISyncReceiverLogAnalyzer { String[] dataDirs = IoTDBDescriptor.getInstance().getConfig().getDataDirs(); LOGGER.info("Start to recover all sync state for sync receiver."); for (String dataDir : dataDirs) { - if (!new File(FilePathUtils.regularizePath(dataDir) + Constans.SYNC_RECEIVER).exists()) { + if (!new File(FilePathUtils.regularizePath(dataDir) + SyncConstant.SYNC_RECEIVER).exists()) { continue; } for (File syncFolder : new File( - FilePathUtils.regularizePath(dataDir) + Constans.SYNC_RECEIVER) + FilePathUtils.regularizePath(dataDir) + SyncConstant.SYNC_RECEIVER) .listFiles()) { recover(syncFolder); } @@ -63,16 +63,16 @@ public class SyncReceiverLogAnalyzer implements ISyncReceiverLogAnalyzer { private boolean recover(File senderFolder) throws IOException { // check the state - if (!new File(senderFolder, Constans.SYNC_LOG_NAME).exists()) { - new File(senderFolder, Constans.LOAD_LOG_NAME).delete(); + if (!new File(senderFolder, SyncConstant.SYNC_LOG_NAME).exists()) { + new File(senderFolder, SyncConstant.LOAD_LOG_NAME).delete(); return true; } if (FileLoaderManager.getInstance().containsFileLoader(senderFolder.getName())) { FileLoaderManager.getInstance().getFileLoader(senderFolder.getName()).endSync(); } else { scanLogger(FileLoader.createFileLoader(senderFolder), - new File(senderFolder, Constans.SYNC_LOG_NAME), - new File(senderFolder, Constans.LOAD_LOG_NAME)); + new File(senderFolder, SyncConstant.SYNC_LOG_NAME), + new File(senderFolder, SyncConstant.LOAD_LOG_NAME)); } return FileLoaderManager.getInstance().containsFileLoader(senderFolder.getName()); } @@ -82,11 +82,11 @@ public class SyncReceiverLogAnalyzer implements ISyncReceiverLogAnalyzer { String[] dataDirs = IoTDBDescriptor.getInstance().getConfig().getDataDirs(); boolean recoverComplete = true; for (String dataDir : dataDirs) { - if (!new File(FilePathUtils.regularizePath(dataDir) + Constans.SYNC_RECEIVER).exists()) { + if (!new File(FilePathUtils.regularizePath(dataDir) + SyncConstant.SYNC_RECEIVER).exists()) { continue; } for (File syncFolder : new File( - FilePathUtils.regularizePath(dataDir) + Constans.SYNC_RECEIVER) + FilePathUtils.regularizePath(dataDir) + SyncConstant.SYNC_RECEIVER) .listFiles()) { if (syncFolder.getName().equals(senderName)) { recoverComplete &= recover(syncFolder); diff --git a/server/src/main/java/org/apache/iotdb/db/sync/receiver/transfer/SyncServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/sync/receiver/transfer/SyncServiceImpl.java index 12ab86b..8a6064a 100644 --- a/server/src/main/java/org/apache/iotdb/db/sync/receiver/transfer/SyncServiceImpl.java +++ b/server/src/main/java/org/apache/iotdb/db/sync/receiver/transfer/SyncServiceImpl.java @@ -46,7 +46,7 @@ import org.apache.iotdb.db.sync.receiver.load.FileLoader; import org.apache.iotdb.db.sync.receiver.load.FileLoaderManager; import org.apache.iotdb.db.sync.receiver.recover.SyncReceiverLogAnalyzer; import org.apache.iotdb.db.sync.receiver.recover.SyncReceiverLogger; -import org.apache.iotdb.db.sync.sender.conf.Constans; +import org.apache.iotdb.db.sync.sender.conf.SyncConstant; import org.apache.iotdb.db.utils.FilePathUtils; import org.apache.iotdb.db.utils.SyncUtils; import org.apache.iotdb.service.sync.thrift.ResultStatus; @@ -86,7 +86,7 @@ public class SyncServiceImpl implements SyncService.Iface { public ResultStatus check(String ipAddress, String uuid) { Thread.currentThread().setName(ThreadName.SYNC_SERVER.getName()); if (SyncUtils.verifyIPSegment(config.getIpWhiteList(), ipAddress)) { - senderName.set(ipAddress + Constans.SYNC_DIR_NAME_SEPARATOR + uuid); + senderName.set(ipAddress + SyncConstant.SYNC_DIR_NAME_SEPARATOR + uuid); if (checkRecovery()) { return getSuccessResult(); } else { @@ -117,7 +117,7 @@ public class SyncServiceImpl implements SyncService.Iface { initPath(); currentSG.remove(); FileLoader.createFileLoader(senderName.get(), syncFolderPath.get()); - syncLog.set(new SyncReceiverLogger(new File(getSyncDataPath(), Constans.SYNC_LOG_NAME))); + syncLog.set(new SyncReceiverLogger(new File(getSyncDataPath(), SyncConstant.SYNC_LOG_NAME))); return getSuccessResult(); } catch (DiskSpaceInsufficientException | IOException e) { logger.error("Can not receiver data from sender", e); @@ -132,7 +132,7 @@ public class SyncServiceImpl implements SyncService.Iface { String dataDir = new File(DirectoryManager.getInstance().getNextFolderForSequenceFile()) .getParentFile().getAbsolutePath(); syncFolderPath - .set(FilePathUtils.regularizePath(dataDir) + Constans.SYNC_RECEIVER + File.separatorChar + .set(FilePathUtils.regularizePath(dataDir) + SyncConstant.SYNC_RECEIVER + File.separatorChar + senderName.get()); } @@ -141,7 +141,7 @@ public class SyncServiceImpl implements SyncService.Iface { */ @Override public ResultStatus init(String storageGroup) { - logger.info("Sync process starts to receive data of storage group {}", storageGroup); + logger.info("Sync process started to receive data of storage group {}", storageGroup); currentSG.set(storageGroup); try { syncLog.get().startSyncDeletedFilesName(); @@ -307,6 +307,7 @@ public class SyncServiceImpl implements SyncService.Iface { try { syncLog.get().close(); FileLoaderManager.getInstance().getFileLoader(senderName.get()).endSync(); + logger.info("Sync process for data of storage group {} ended.", currentSG.get()); } catch (IOException e) { logger.error("Can not end sync", e); return getErrorResult(String.format("Can not end sync because %s", e.getMessage())); @@ -315,7 +316,7 @@ public class SyncServiceImpl implements SyncService.Iface { } private String getSyncDataPath() { - return syncFolderPath.get() + File.separatorChar + Constans.RECEIVER_DATA_FOLDER_NAME; + return syncFolderPath.get() + File.separatorChar + SyncConstant.RECEIVER_DATA_FOLDER_NAME; } private ResultStatus getSuccessResult() { diff --git a/server/src/main/java/org/apache/iotdb/db/sync/sender/conf/Constans.java b/server/src/main/java/org/apache/iotdb/db/sync/sender/conf/SyncConstant.java similarity index 95% rename from server/src/main/java/org/apache/iotdb/db/sync/sender/conf/Constans.java rename to server/src/main/java/org/apache/iotdb/db/sync/sender/conf/SyncConstant.java index d711aa5..f73151a 100644 --- a/server/src/main/java/org/apache/iotdb/db/sync/sender/conf/Constans.java +++ b/server/src/main/java/org/apache/iotdb/db/sync/sender/conf/SyncConstant.java @@ -18,13 +18,15 @@ */ package org.apache.iotdb.db.sync.sender.conf; -public class Constans { +public class SyncConstant { - private Constans() { + private SyncConstant() { } public static final String CONFIG_NAME = "iotdb-sync-client.properties"; + public static final String SYNC_SENDER = "sync-sender"; + public static final String SYNC_RECEIVER = "sync-receiver"; public static final String MESSAGE_DIGIT_NAME = "MD5"; @@ -32,7 +34,7 @@ public class Constans { public static final String SYNC_DIR_NAME_SEPARATOR = "_"; /** - * Split data file , block size at each transmission + * Split data file, block size at each transmission **/ public static final int DATA_CHUNK_SIZE = 64 * 1024 * 1024; diff --git a/server/src/main/java/org/apache/iotdb/db/sync/sender/conf/SyncSenderConfig.java b/server/src/main/java/org/apache/iotdb/db/sync/sender/conf/SyncSenderConfig.java index 4a0ae8b..cc6f11c 100644 --- a/server/src/main/java/org/apache/iotdb/db/sync/sender/conf/SyncSenderConfig.java +++ b/server/src/main/java/org/apache/iotdb/db/sync/sender/conf/SyncSenderConfig.java @@ -19,6 +19,7 @@ package org.apache.iotdb.db.sync.sender.conf; import java.io.File; +import java.util.List; public class SyncSenderConfig { @@ -34,21 +35,27 @@ public class SyncSenderConfig { private String uuidPath; - private String lastFileInfo; + private String lastFileInfoPath; private String snapshotPath; /** + * Storage groups which participate in sync process + */ + private List<String> storageGroupList; + + /** * Update paths based on data directory */ public void update(String dataDirectory) { - senderFolderPath = dataDirectory + File.separatorChar + Constans.SYNC_SENDER + File.separatorChar + - getSyncReceiverName(); - lockFilePath = senderFolderPath + File.separatorChar + Constans.LOCK_FILE_NAME; - uuidPath = senderFolderPath + File.separatorChar + Constans.UUID_FILE_NAME; - lastFileInfo = senderFolderPath + File.separatorChar + Constans.LAST_LOCAL_FILE_NAME; - snapshotPath = senderFolderPath + File.separatorChar + Constans.DATA_SNAPSHOT_NAME; - if(!new File(snapshotPath).exists()){ + senderFolderPath = + dataDirectory + File.separatorChar + SyncConstant.SYNC_SENDER + File.separatorChar + + getSyncReceiverName(); + lockFilePath = senderFolderPath + File.separatorChar + SyncConstant.LOCK_FILE_NAME; + uuidPath = senderFolderPath + File.separatorChar + SyncConstant.UUID_FILE_NAME; + lastFileInfoPath = senderFolderPath + File.separatorChar + SyncConstant.LAST_LOCAL_FILE_NAME; + snapshotPath = senderFolderPath + File.separatorChar + SyncConstant.DATA_SNAPSHOT_NAME; + if (!new File(snapshotPath).exists()) { new File(snapshotPath).mkdirs(); } } @@ -93,12 +100,12 @@ public class SyncSenderConfig { this.lockFilePath = lockFilePath; } - public String getLastFileInfo() { - return lastFileInfo; + public String getLastFileInfoPath() { + return lastFileInfoPath; } - public void setLastFileInfo(String lastFileInfo) { - this.lastFileInfo = lastFileInfo; + public void setLastFileInfoPath(String lastFileInfoPath) { + this.lastFileInfoPath = lastFileInfoPath; } public String getSnapshotPath() { @@ -114,6 +121,14 @@ public class SyncSenderConfig { } public String getSyncReceiverName() { - return serverIp + Constans.SYNC_DIR_NAME_SEPARATOR + serverPort; + return serverIp + SyncConstant.SYNC_DIR_NAME_SEPARATOR + serverPort; + } + + public List<String> getStorageGroupList() { + return storageGroupList; + } + + public void setStorageGroupList(List<String> storageGroupList) { + this.storageGroupList = storageGroupList; } } diff --git a/server/src/main/java/org/apache/iotdb/db/sync/sender/conf/SyncSenderDescriptor.java b/server/src/main/java/org/apache/iotdb/db/sync/sender/conf/SyncSenderDescriptor.java index df1b834..a745156 100644 --- a/server/src/main/java/org/apache/iotdb/db/sync/sender/conf/SyncSenderDescriptor.java +++ b/server/src/main/java/org/apache/iotdb/db/sync/sender/conf/SyncSenderDescriptor.java @@ -23,6 +23,9 @@ import java.io.FileInputStream; import java.io.FileNotFoundException; import java.io.IOException; import java.io.InputStream; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; import java.util.Properties; import org.apache.iotdb.db.conf.IoTDBConstant; import org.slf4j.Logger; @@ -59,15 +62,15 @@ public class SyncSenderDescriptor { url = System.getProperty(IoTDBConstant.IOTDB_HOME, null); if (url != null) { url = url + File.separatorChar + "conf" + File.separatorChar - + Constans.CONFIG_NAME; + + SyncConstant.CONFIG_NAME; } else { logger.warn( "Cannot find IOTDB_HOME or IOTDB_CONF environment variable when loading config file {}, use default configuration", - Constans.CONFIG_NAME); + SyncConstant.CONFIG_NAME); return; } } else { - url += (File.separatorChar + Constans.CONFIG_NAME); + url += (File.separatorChar + SyncConstant.CONFIG_NAME); } try { @@ -88,10 +91,17 @@ public class SyncSenderDescriptor { conf.setSyncPeriodInSecond(Integer.parseInt(properties .getProperty("sync_period_in_second", Integer.toString(conf.getSyncPeriodInSecond())))); + String storageGroups = properties.getProperty("sync_storage_groups", null); + if (storageGroups != null) { + String[] splits = storageGroups.split(","); + List<String> storageGroupList = new ArrayList<>(); + Arrays.stream(splits).forEach(sg -> storageGroupList.add(sg.trim())); + conf.setStorageGroupList(storageGroupList); + } } catch (IOException e) { - logger.warn("Cannot load config file, use default configuration.", e); + logger.warn("Cannot load sync config file, use default sync configuration.", e); } catch (Exception e) { - logger.warn("Error format in config file, use default configuration.", e); + logger.warn("Error format in sync config file, use default sync configuration.", e); } finally { try { inputStream.close(); diff --git a/server/src/main/java/org/apache/iotdb/db/sync/sender/manage/ISyncFileManager.java b/server/src/main/java/org/apache/iotdb/db/sync/sender/manage/ISyncFileManager.java index 684db79..1255337 100644 --- a/server/src/main/java/org/apache/iotdb/db/sync/sender/manage/ISyncFileManager.java +++ b/server/src/main/java/org/apache/iotdb/db/sync/sender/manage/ISyncFileManager.java @@ -22,15 +22,15 @@ import java.io.File; import java.io.IOException; /** - * This interface is used to manage deleted files and new closed files that need to be synchronized in each - * sync task. + * This interface is used to manage deleted files and new closed files that need to be synchronized + * in each sync task. */ public interface ISyncFileManager { /** * Find out all closed and unmodified files, which means there has a .resource file and doesn't - * have a .mod file. For these files, they will eventually generate a new tsfile file as the merge - * operation is executed and executed in subsequent synchronization tasks. + * have a .mod file and .merge file. For these files, they will eventually generate a new tsfile + * file as the merge operation is executed and executed in subsequent synchronization tasks. * * @param dataDir data directory */ diff --git a/server/src/main/java/org/apache/iotdb/db/sync/sender/manage/SyncFileManager.java b/server/src/main/java/org/apache/iotdb/db/sync/sender/manage/SyncFileManager.java index 89ea93d..af4fac6 100644 --- a/server/src/main/java/org/apache/iotdb/db/sync/sender/manage/SyncFileManager.java +++ b/server/src/main/java/org/apache/iotdb/db/sync/sender/manage/SyncFileManager.java @@ -29,6 +29,7 @@ import java.util.Map; import java.util.Map.Entry; import java.util.Set; import org.apache.iotdb.db.conf.IoTDBConstant; +import org.apache.iotdb.db.engine.merge.task.MergeTask; import org.apache.iotdb.db.engine.modification.ModificationFile; import org.apache.iotdb.db.engine.storagegroup.TsFileResource; import org.apache.iotdb.db.sync.sender.conf.SyncSenderDescriptor; @@ -45,25 +46,25 @@ public class SyncFileManager implements ISyncFileManager { private Set<String> allSG; /** - * Key is storage group, value is the set of current sealed tsfile in the sg. + * Key is storage group, value is the set of current sealed tsfile in the storage group. */ private Map<String, Set<File>> currentSealedLocalFilesMap; /** - * Key is storage group, value is the set of last local tsfiles in the sg, which don't contains - * those tsfiles which are not synced successfully. + * Key is storage group, value is the set of last local tsfiles in the storage group, which don't + * contains those tsfiles which are not synced successfully. */ private Map<String, Set<File>> lastLocalFilesMap; /** * Key is storage group, value is the valid set of deleted tsfiles which need to be synced to - * receiver end in the sg. + * receiver end in the storage group. */ private Map<String, Set<File>> deletedFilesMap; /** * Key is storage group, value is the valid set of new tsfiles which need to be synced to receiver - * end in the sg. + * end in the storage group. */ private Map<String, Set<File>> toBeSyncedFilesMap; @@ -91,9 +92,11 @@ public class SyncFileManager implements ISyncFileManager { for (File sgFolder : allSGFolders) { allSG.add(sgFolder.getName()); currentAllLocalFiles.putIfAbsent(sgFolder.getName(), new HashSet<>()); - Arrays.stream(sgFolder.listFiles()) - .forEach(file -> currentAllLocalFiles.get(sgFolder.getName()) - .add(new File(sgFolder.getAbsolutePath(), file.getName()))); + if (sgFolder.listFiles() != null) { + Arrays.stream(sgFolder.listFiles()) + .forEach(file -> currentAllLocalFiles.get(sgFolder.getName()) + .add(new File(sgFolder.getAbsolutePath(), file.getName()))); + } } // get sealed tsfiles @@ -102,11 +105,13 @@ public class SyncFileManager implements ISyncFileManager { currentSealedLocalFilesMap.putIfAbsent(sgName, new HashSet<>()); for (File file : entry.getValue()) { if (file.getName().endsWith(ModificationFile.FILE_SUFFIX) || file.getName() - .endsWith(TsFileResource.RESOURCE_SUFFIX)) { + .endsWith(TsFileResource.RESOURCE_SUFFIX) || file.getName() + .endsWith(MergeTask.MERGE_SUFFIX)) { continue; } if (new File(file.getAbsolutePath() + TsFileResource.RESOURCE_SUFFIX).exists() && !new File( - file.getAbsolutePath() + ModificationFile.FILE_SUFFIX).exists()) { + file.getAbsolutePath() + ModificationFile.FILE_SUFFIX).exists() && !new File( + file.getAbsolutePath() + MergeTask.MERGE_SUFFIX).exists()) { currentSealedLocalFilesMap.get(sgName).add(file); } } @@ -136,7 +141,8 @@ public class SyncFileManager implements ISyncFileManager { public void getValidFiles(String dataDir) throws IOException { allSG = new HashSet<>(); getCurrentLocalFiles(dataDir); - getLastLocalFiles(new File(SyncSenderDescriptor.getInstance().getConfig().getLastFileInfo())); + getLastLocalFiles( + new File(SyncSenderDescriptor.getInstance().getConfig().getLastFileInfoPath())); toBeSyncedFilesMap = new HashMap<>(); deletedFilesMap = new HashMap<>(); for (String sgName : allSG) { diff --git a/server/src/main/java/org/apache/iotdb/db/sync/sender/recover/ISyncSenderLogAnalyzer.java b/server/src/main/java/org/apache/iotdb/db/sync/sender/recover/ISyncSenderLogAnalyzer.java index 2f2177b..084a8d4 100644 --- a/server/src/main/java/org/apache/iotdb/db/sync/sender/recover/ISyncSenderLogAnalyzer.java +++ b/server/src/main/java/org/apache/iotdb/db/sync/sender/recover/ISyncSenderLogAnalyzer.java @@ -28,12 +28,25 @@ import java.util.Set; */ public interface ISyncSenderLogAnalyzer { + /** + * Recover sync tasks that were not completed properly last time, and clean up the environment. + */ void recover() throws IOException; + /** + * Load last local files from last local info file. + */ void loadLastLocalFiles(Set<String> lastLocalFiles); + /** + * Load the sync log, which indicates the progress of the last synchronization task. + * Deleted files and new tsfiles can be obtained by log analysis. + */ void loadLogger(Set<String> deletedFiles, Set<String> newFiles); + /** + * Update the last local info file based on the log information of the last task + */ void updateLastLocalFile(Set<String> currentLocalFiles) throws IOException; } diff --git a/server/src/main/java/org/apache/iotdb/db/sync/sender/recover/ISyncSenderLogger.java b/server/src/main/java/org/apache/iotdb/db/sync/sender/recover/ISyncSenderLogger.java index 0229579..2b7a243 100644 --- a/server/src/main/java/org/apache/iotdb/db/sync/sender/recover/ISyncSenderLogger.java +++ b/server/src/main/java/org/apache/iotdb/db/sync/sender/recover/ISyncSenderLogger.java @@ -31,7 +31,7 @@ import java.io.IOException; public interface ISyncSenderLogger { /** - * Start sync deleted files name + * Start to sync deleted files name * @throws IOException */ void startSyncDeletedFilesName() throws IOException; @@ -44,7 +44,7 @@ public interface ISyncSenderLogger { void finishSyncDeletedFileName(File file) throws IOException; /** - * Start sync new tsfiles + * Start to sync new tsfiles * @throws IOException */ void startSyncTsFiles() throws IOException; diff --git a/server/src/main/java/org/apache/iotdb/db/sync/sender/recover/SyncSenderLogAnalyzer.java b/server/src/main/java/org/apache/iotdb/db/sync/sender/recover/SyncSenderLogAnalyzer.java index d4c65ef..9531da4 100644 --- a/server/src/main/java/org/apache/iotdb/db/sync/sender/recover/SyncSenderLogAnalyzer.java +++ b/server/src/main/java/org/apache/iotdb/db/sync/sender/recover/SyncSenderLogAnalyzer.java @@ -27,7 +27,7 @@ import java.io.IOException; import java.util.HashSet; import java.util.Set; import org.apache.commons.io.FileUtils; -import org.apache.iotdb.db.sync.sender.conf.Constans; +import org.apache.iotdb.db.sync.sender.conf.SyncConstant; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -41,15 +41,15 @@ public class SyncSenderLogAnalyzer implements ISyncSenderLogAnalyzer { public SyncSenderLogAnalyzer(String senderPath) { this.senderPath = senderPath; - this.currentLocalFile = new File(senderPath, Constans.CURRENT_LOCAL_FILE_NAME); - this.lastLocalFile = new File(senderPath, Constans.LAST_LOCAL_FILE_NAME); - this.syncLogFile = new File(senderPath, Constans.SYNC_LOG_NAME); + this.currentLocalFile = new File(senderPath, SyncConstant.CURRENT_LOCAL_FILE_NAME); + this.lastLocalFile = new File(senderPath, SyncConstant.LAST_LOCAL_FILE_NAME); + this.syncLogFile = new File(senderPath, SyncConstant.SYNC_LOG_NAME); } @Override public void recover() throws IOException { if (currentLocalFile.exists() && !lastLocalFile.exists()) { - currentLocalFile.renameTo(lastLocalFile); + FileUtils.moveFile(currentLocalFile, lastLocalFile); } else { Set<String> lastLocalFiles = new HashSet<>(); Set<String> deletedFiles = new HashSet<>(); @@ -60,7 +60,7 @@ public class SyncSenderLogAnalyzer implements ISyncSenderLogAnalyzer { lastLocalFiles.addAll(newFiles); updateLastLocalFile(lastLocalFiles); } - FileUtils.deleteDirectory(new File(senderPath, Constans.DATA_SNAPSHOT_NAME)); + FileUtils.deleteDirectory(new File(senderPath, SyncConstant.DATA_SNAPSHOT_NAME)); syncLogFile.delete(); } diff --git a/server/src/main/java/org/apache/iotdb/db/sync/sender/transfer/DataTransferManager.java b/server/src/main/java/org/apache/iotdb/db/sync/sender/transfer/DataTransferManager.java index 39ebe20..0952fe5 100644 --- a/server/src/main/java/org/apache/iotdb/db/sync/sender/transfer/DataTransferManager.java +++ b/server/src/main/java/org/apache/iotdb/db/sync/sender/transfer/DataTransferManager.java @@ -34,6 +34,7 @@ import java.nio.file.Path; import java.security.MessageDigest; import java.security.NoSuchAlgorithmException; import java.util.HashSet; +import java.util.List; import java.util.Map; import java.util.Set; import java.util.UUID; @@ -46,7 +47,7 @@ import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.engine.storagegroup.TsFileResource; import org.apache.iotdb.db.exception.SyncConnectionException; import org.apache.iotdb.db.metadata.MetadataConstant; -import org.apache.iotdb.db.sync.sender.conf.Constans; +import org.apache.iotdb.db.sync.sender.conf.SyncConstant; import org.apache.iotdb.db.sync.sender.conf.SyncSenderConfig; import org.apache.iotdb.db.sync.sender.conf.SyncSenderDescriptor; import org.apache.iotdb.db.sync.sender.manage.SyncFileManager; @@ -65,9 +66,6 @@ import org.apache.thrift.transport.TTransportException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -/** - * SyncSenderImpl is used to transfer tsfiles that needs to sync to receiver. - */ public class DataTransferManager implements IDataTransferManager { private static final Logger logger = LoggerFactory.getLogger(DataTransferManager.class); @@ -191,7 +189,7 @@ public class DataTransferManager implements IDataTransferManager { if (syncStatus) { logger.info("Sync process for receiver {} is in execution!", config.getSyncReceiverName()); } - }, Constans.SYNC_MONITOR_DELAY, Constans.SYNC_MONITOR_PERIOD, TimeUnit.SECONDS); + }, SyncConstant.SYNC_MONITOR_DELAY, SyncConstant.SYNC_MONITOR_PERIOD, TimeUnit.SECONDS); } /** @@ -205,7 +203,7 @@ public class DataTransferManager implements IDataTransferManager { logger.error("Sync failed", e); stop(); } - }, Constans.SYNC_PROCESS_DELAY, Constans.SYNC_PROCESS_PERIOD, TimeUnit.SECONDS); + }, SyncConstant.SYNC_PROCESS_DELAY, SyncConstant.SYNC_PROCESS_PERIOD, TimeUnit.SECONDS); } @Override @@ -300,7 +298,6 @@ public class DataTransferManager implements IDataTransferManager { } if (!file.exists()) { try (FileOutputStream out = new FileOutputStream(file)) { - file.createNewFile(); uuid = generateUUID(); out.write(uuid.getBytes()); } catch (IOException e) { @@ -327,18 +324,15 @@ public class DataTransferManager implements IDataTransferManager { int retryCount = 0; serviceClient.initSyncData(MetadataConstant.METADATA_LOG); while (true) { - if (retryCount > Constans.MAX_SYNC_FILE_TRY) { + if (retryCount > SyncConstant.MAX_SYNC_FILE_TRY) { throw new SyncConnectionException(String - .format("Can not sync schema after %s retries.", Constans.MAX_SYNC_FILE_TRY)); + .format("Can not sync schema after %s retries.", SyncConstant.MAX_SYNC_FILE_TRY)); } - try { - if (tryToSyncSchema()) { - writeSyncSchemaPos(getSchemaPosFile()); - break; - } - } finally { - retryCount++; + if (tryToSyncSchema()) { + writeSyncSchemaPos(getSchemaPosFile()); + break; } + retryCount++; } } @@ -347,12 +341,12 @@ public class DataTransferManager implements IDataTransferManager { // start to sync file data and get md5 of this file. try (BufferedReader br = new BufferedReader(new FileReader(getSchemaLogFile())); - ByteArrayOutputStream bos = new ByteArrayOutputStream(Constans.DATA_CHUNK_SIZE)) { + ByteArrayOutputStream bos = new ByteArrayOutputStream(SyncConstant.DATA_CHUNK_SIZE)) { schemaFileLinePos = 0; while (schemaFileLinePos++ <= schemaPos) { br.readLine(); } - MessageDigest md = MessageDigest.getInstance(Constans.MESSAGE_DIGIT_NAME); + MessageDigest md = MessageDigest.getInstance(SyncConstant.MESSAGE_DIGIT_NAME); String line; int cntLine = 0; while ((line = br.readLine()) != null) { @@ -435,7 +429,11 @@ public class DataTransferManager implements IDataTransferManager { try { syncStatus = true; + List<String> storageGroups = config.getStorageGroupList(); for (String sgName : allSG) { + if (!storageGroups.isEmpty() && !storageGroups.contains(sgName)) { + continue; + } lastLocalFilesMap.putIfAbsent(sgName, new HashSet<>()); syncLog = new SyncSenderLogger(getSyncLogFile()); try { @@ -496,9 +494,8 @@ public class DataTransferManager implements IDataTransferManager { int cnt = 0; for (File tsfile : toBeSyncFiles) { cnt++; - File snapshotFile = null; try { - snapshotFile = makeFileSnapshot(tsfile); + File snapshotFile = makeFileSnapshot(tsfile); // firstly sync .restore file, then sync tsfile syncSingleFile(new File(snapshotFile, TsFileResource.RESOURCE_SUFFIX)); syncSingleFile(snapshotFile); @@ -509,10 +506,6 @@ public class DataTransferManager implements IDataTransferManager { logger.info( "Tsfile {} can not make snapshot, so skip the tsfile and continue to sync other tsfiles", tsfile, e); - } finally { - if (snapshotFile != null) { - snapshotFile.delete(); - } } } logger.info("Sync process has finished storage group {}.", sgName); @@ -545,21 +538,21 @@ public class DataTransferManager implements IDataTransferManager { private void syncSingleFile(File snapshotFile) throws SyncConnectionException { try { int retryCount = 0; - MessageDigest md = MessageDigest.getInstance(Constans.MESSAGE_DIGIT_NAME); + MessageDigest md = MessageDigest.getInstance(SyncConstant.MESSAGE_DIGIT_NAME); serviceClient.initSyncData(snapshotFile.getName()); outer: while (true) { retryCount++; - if (retryCount > Constans.MAX_SYNC_FILE_TRY) { + if (retryCount > SyncConstant.MAX_SYNC_FILE_TRY) { throw new SyncConnectionException(String .format("Can not sync file %s after %s tries.", snapshotFile.getAbsoluteFile(), - Constans.MAX_SYNC_FILE_TRY)); + SyncConstant.MAX_SYNC_FILE_TRY)); } md.reset(); - byte[] buffer = new byte[Constans.DATA_CHUNK_SIZE]; + byte[] buffer = new byte[SyncConstant.DATA_CHUNK_SIZE]; int dataLength; try (FileInputStream fis = new FileInputStream(snapshotFile); - ByteArrayOutputStream bos = new ByteArrayOutputStream(Constans.DATA_CHUNK_SIZE)) { + ByteArrayOutputStream bos = new ByteArrayOutputStream(SyncConstant.DATA_CHUNK_SIZE)) { while ((dataLength = fis.read(buffer)) != -1) { // cut the file into pieces to send bos.write(buffer, 0, dataLength); md.update(buffer, 0, dataLength); @@ -589,9 +582,9 @@ public class DataTransferManager implements IDataTransferManager { } } - private void endSync() { + private void endSync() throws IOException { File currentLocalFile = getCurrentLogFile(); - File lastLocalFile = new File(config.getLastFileInfo()); + File lastLocalFile = new File(config.getLastFileInfoPath()); // 1. Write file list to currentLocalFile try (BufferedWriter bw = new BufferedWriter(new FileWriter(currentLocalFile))) { @@ -608,7 +601,7 @@ public class DataTransferManager implements IDataTransferManager { // 2. Rename currentLocalFile to lastLocalFile lastLocalFile.delete(); - currentLocalFile.renameTo(lastLocalFile); + FileUtils.moveFile(currentLocalFile, lastLocalFile); // 3. delete snapshot directory try { @@ -623,7 +616,7 @@ public class DataTransferManager implements IDataTransferManager { private File getSchemaPosFile() { - return new File(config.getSenderFolderPath(), Constans.SCHEMA_POS_FILE_NAME); + return new File(config.getSenderFolderPath(), SyncConstant.SCHEMA_POS_FILE_NAME); } private File getSchemaLogFile() { @@ -637,11 +630,11 @@ public class DataTransferManager implements IDataTransferManager { } private File getSyncLogFile() { - return new File(config.getSenderFolderPath(), Constans.SYNC_LOG_NAME); + return new File(config.getSenderFolderPath(), SyncConstant.SYNC_LOG_NAME); } private File getCurrentLogFile() { - return new File(config.getSenderFolderPath(), Constans.CURRENT_LOCAL_FILE_NAME); + return new File(config.getSenderFolderPath(), SyncConstant.CURRENT_LOCAL_FILE_NAME); } public void setConfig(SyncSenderConfig config) { diff --git a/server/src/test/java/org/apache/iotdb/db/sync/receiver/load/FileLoaderTest.java b/server/src/test/java/org/apache/iotdb/db/sync/receiver/load/FileLoaderTest.java index 4354484..4f31c57 100644 --- a/server/src/test/java/org/apache/iotdb/db/sync/receiver/load/FileLoaderTest.java +++ b/server/src/test/java/org/apache/iotdb/db/sync/receiver/load/FileLoaderTest.java @@ -37,7 +37,7 @@ import org.apache.iotdb.db.exception.StartupException; import org.apache.iotdb.db.exception.StorageEngineException; import org.apache.iotdb.db.metadata.MManager; import org.apache.iotdb.db.service.IoTDB; -import org.apache.iotdb.db.sync.sender.conf.Constans; +import org.apache.iotdb.db.sync.sender.conf.SyncConstant; import org.apache.iotdb.db.utils.EnvironmentUtils; import org.junit.After; import org.junit.Before; @@ -149,7 +149,7 @@ public class FileLoaderTest { LOGGER.error("Fail to wait for loading new tsfiles", e); } - assert !new File(getReceiverFolderFile(), Constans.RECEIVER_DATA_FOLDER_NAME).exists(); + assert !new File(getReceiverFolderFile(), SyncConstant.RECEIVER_DATA_FOLDER_NAME).exists(); Map<String, Set<File>> sequenceLoadedFileMap = new HashMap<>(); for (int i = 0; i < 3; i++) { StorageGroupProcessor processor = StorageEngine.getInstance().getProcessor(SG_NAME + i); @@ -235,7 +235,7 @@ public class FileLoaderTest { LOGGER.error("Fail to wait for loading new tsfiles", e); } - assert !new File(getReceiverFolderFile(), Constans.RECEIVER_DATA_FOLDER_NAME).exists(); + assert !new File(getReceiverFolderFile(), SyncConstant.RECEIVER_DATA_FOLDER_NAME).exists(); sequenceLoadedFileMap = new HashMap<>(); for (int i = 0; i < 3; i++) { StorageGroupProcessor processor = StorageEngine.getInstance().getProcessor(SG_NAME + i); @@ -339,7 +339,7 @@ public class FileLoaderTest { LOGGER.error("Fail to wait for loading new tsfiles", e); } - assert !new File(getReceiverFolderFile(), Constans.RECEIVER_DATA_FOLDER_NAME).exists(); + assert !new File(getReceiverFolderFile(), SyncConstant.RECEIVER_DATA_FOLDER_NAME).exists(); Map<String, Set<File>> loadedFileMap = new HashMap<>(); for (int i = 0; i < 3; i++) { StorageGroupProcessor processor = StorageEngine.getInstance().getProcessor(SG_NAME + i); @@ -414,11 +414,11 @@ public class FileLoaderTest { } private File getReceiverFolderFile() { - return new File(dataDir + File.separatorChar + Constans.SYNC_RECEIVER + File.separatorChar + return new File(dataDir + File.separatorChar + SyncConstant.SYNC_RECEIVER + File.separatorChar + "127.0.0.1_5555"); } private File getSnapshotFolder() { - return new File(getReceiverFolderFile(), Constans.RECEIVER_DATA_FOLDER_NAME); + return new File(getReceiverFolderFile(), SyncConstant.RECEIVER_DATA_FOLDER_NAME); } } \ No newline at end of file diff --git a/server/src/test/java/org/apache/iotdb/db/sync/receiver/recover/SyncReceiverLogAnalyzerTest.java b/server/src/test/java/org/apache/iotdb/db/sync/receiver/recover/SyncReceiverLogAnalyzerTest.java index 7a591a0..f2de4d0 100644 --- a/server/src/test/java/org/apache/iotdb/db/sync/receiver/recover/SyncReceiverLogAnalyzerTest.java +++ b/server/src/test/java/org/apache/iotdb/db/sync/receiver/recover/SyncReceiverLogAnalyzerTest.java @@ -41,7 +41,7 @@ import org.apache.iotdb.db.service.IoTDB; import org.apache.iotdb.db.sync.receiver.load.FileLoader; import org.apache.iotdb.db.sync.receiver.load.FileLoaderManager; import org.apache.iotdb.db.sync.receiver.load.FileLoaderTest; -import org.apache.iotdb.db.sync.sender.conf.Constans; +import org.apache.iotdb.db.sync.sender.conf.SyncConstant; import org.apache.iotdb.db.utils.EnvironmentUtils; import org.junit.After; import org.junit.Before; @@ -90,7 +90,7 @@ public class SyncReceiverLogAnalyzerTest { @Test public void recover() throws IOException, StorageEngineException { receiverLogger = new SyncReceiverLogger( - new File(getReceiverFolderFile(), Constans.SYNC_LOG_NAME)); + new File(getReceiverFolderFile(), SyncConstant.SYNC_LOG_NAME)); fileLoader = FileLoader.createFileLoader(getReceiverFolderFile()); Map<String, Set<File>> allFileList = new HashMap<>(); Map<String, Set<File>> correctSequenceLoadedFileMap = new HashMap<>(); @@ -151,13 +151,13 @@ public class SyncReceiverLogAnalyzerTest { } receiverLogger.close(); - assert new File(getReceiverFolderFile(), Constans.LOAD_LOG_NAME).exists(); - assert new File(getReceiverFolderFile(), Constans.SYNC_LOG_NAME).exists(); + assert new File(getReceiverFolderFile(), SyncConstant.LOAD_LOG_NAME).exists(); + assert new File(getReceiverFolderFile(), SyncConstant.SYNC_LOG_NAME).exists(); assert FileLoaderManager.getInstance().containsFileLoader(getReceiverFolderFile().getName()); int count = 0, mode = 0; Set<String> toBeSyncedFilesTest = new HashSet<>(); try (BufferedReader br = new BufferedReader( - new FileReader(new File(getReceiverFolderFile(), Constans.SYNC_LOG_NAME)))) { + new FileReader(new File(getReceiverFolderFile(), SyncConstant.SYNC_LOG_NAME)))) { String line; while ((line = br.readLine()) != null) { count++; @@ -189,17 +189,17 @@ public class SyncReceiverLogAnalyzerTest { LOGGER.error("Fail to wait for loading new tsfiles", e); } - assert !new File(getReceiverFolderFile(), Constans.LOAD_LOG_NAME).exists(); - assert !new File(getReceiverFolderFile(), Constans.SYNC_LOG_NAME).exists(); + assert !new File(getReceiverFolderFile(), SyncConstant.LOAD_LOG_NAME).exists(); + assert !new File(getReceiverFolderFile(), SyncConstant.SYNC_LOG_NAME).exists(); } private File getReceiverFolderFile() { - return new File(dataDir + File.separatorChar + Constans.SYNC_RECEIVER + File.separatorChar + return new File(dataDir + File.separatorChar + SyncConstant.SYNC_RECEIVER + File.separatorChar + "127.0.0.1_5555"); } private File getSnapshotFolder() { - return new File(getReceiverFolderFile(), Constans.RECEIVER_DATA_FOLDER_NAME); + return new File(getReceiverFolderFile(), SyncConstant.RECEIVER_DATA_FOLDER_NAME); } } \ No newline at end of file diff --git a/server/src/test/java/org/apache/iotdb/db/sync/receiver/recover/SyncReceiverLoggerTest.java b/server/src/test/java/org/apache/iotdb/db/sync/receiver/recover/SyncReceiverLoggerTest.java index c6b248d..86ca96c 100644 --- a/server/src/test/java/org/apache/iotdb/db/sync/receiver/recover/SyncReceiverLoggerTest.java +++ b/server/src/test/java/org/apache/iotdb/db/sync/receiver/recover/SyncReceiverLoggerTest.java @@ -28,7 +28,7 @@ import org.apache.iotdb.db.conf.directories.DirectoryManager; import org.apache.iotdb.db.exception.DiskSpaceInsufficientException; import org.apache.iotdb.db.exception.StartupException; import org.apache.iotdb.db.exception.StorageEngineException; -import org.apache.iotdb.db.sync.sender.conf.Constans; +import org.apache.iotdb.db.sync.sender.conf.SyncConstant; import org.apache.iotdb.db.utils.EnvironmentUtils; import org.junit.After; import org.junit.Before; @@ -55,7 +55,7 @@ public class SyncReceiverLoggerTest { @Test public void testSyncReceiverLogger() throws IOException { receiverLogger = new SyncReceiverLogger( - new File(getReceiverFolderFile(), Constans.SYNC_LOG_NAME)); + new File(getReceiverFolderFile(), SyncConstant.SYNC_LOG_NAME)); Set<String> deletedFileNames = new HashSet<>(); Set<String> deletedFileNamesTest = new HashSet<>(); receiverLogger.startSyncDeletedFilesName(); @@ -78,7 +78,7 @@ public class SyncReceiverLoggerTest { int count = 0; int mode = 0; try (BufferedReader br = new BufferedReader( - new FileReader(new File(getReceiverFolderFile(), Constans.SYNC_LOG_NAME)))) { + new FileReader(new File(getReceiverFolderFile(), SyncConstant.SYNC_LOG_NAME)))) { String line; while ((line = br.readLine()) != null) { count++; @@ -103,7 +103,7 @@ public class SyncReceiverLoggerTest { } private File getReceiverFolderFile() { - return new File(dataDir + File.separatorChar + Constans.SYNC_RECEIVER + File.separatorChar + return new File(dataDir + File.separatorChar + SyncConstant.SYNC_RECEIVER + File.separatorChar + "127.0.0.1_5555"); } } \ No newline at end of file diff --git a/server/src/test/java/org/apache/iotdb/db/sync/sender/manage/SyncFileManagerTest.java b/server/src/test/java/org/apache/iotdb/db/sync/sender/manage/SyncFileManagerTest.java index 46f339c..6e9428f 100644 --- a/server/src/test/java/org/apache/iotdb/db/sync/sender/manage/SyncFileManagerTest.java +++ b/server/src/test/java/org/apache/iotdb/db/sync/sender/manage/SyncFileManagerTest.java @@ -270,7 +270,7 @@ public class SyncFileManagerTest { private void updateLastLocalFiles(Map<String, Set<File>> lastLocalFilesMap) { try (BufferedWriter bw = new BufferedWriter( - new FileWriter(new File(config.getLastFileInfo())))) { + new FileWriter(new File(config.getLastFileInfoPath())))) { for (Set<File> currentLocalFiles : lastLocalFilesMap.values()) { for (File file : currentLocalFiles) { bw.write(file.getAbsolutePath()); @@ -279,7 +279,7 @@ public class SyncFileManagerTest { bw.flush(); } } catch (IOException e) { - logger.error("Can not clear sync log {}", config.getLastFileInfo(), e); + logger.error("Can not clear sync log {}", config.getLastFileInfoPath(), e); } } diff --git a/server/src/test/java/org/apache/iotdb/db/sync/sender/recover/SyncSenderLogAnalyzerTest.java b/server/src/test/java/org/apache/iotdb/db/sync/sender/recover/SyncSenderLogAnalyzerTest.java index 71c9629..5040076 100644 --- a/server/src/test/java/org/apache/iotdb/db/sync/sender/recover/SyncSenderLogAnalyzerTest.java +++ b/server/src/test/java/org/apache/iotdb/db/sync/sender/recover/SyncSenderLogAnalyzerTest.java @@ -32,7 +32,7 @@ import org.apache.iotdb.db.engine.storagegroup.TsFileResource; import org.apache.iotdb.db.exception.DiskSpaceInsufficientException; import org.apache.iotdb.db.exception.StartupException; import org.apache.iotdb.db.exception.StorageEngineException; -import org.apache.iotdb.db.sync.sender.conf.Constans; +import org.apache.iotdb.db.sync.sender.conf.SyncConstant; import org.apache.iotdb.db.sync.sender.conf.SyncSenderConfig; import org.apache.iotdb.db.sync.sender.conf.SyncSenderDescriptor; import org.apache.iotdb.db.sync.sender.manage.SyncFileManager; @@ -61,7 +61,7 @@ public class SyncSenderLogAnalyzerTest { .getParentFile().getAbsolutePath(); config.update(dataDir); senderLogger = new SyncSenderLogger( - new File(config.getSenderFolderPath(), Constans.SYNC_LOG_NAME)); + new File(config.getSenderFolderPath(), SyncConstant.SYNC_LOG_NAME)); senderLogAnalyzer = new SyncSenderLogAnalyzer(config.getSenderFolderPath()); } @@ -120,9 +120,9 @@ public class SyncSenderLogAnalyzerTest { } // delete some files - assert !new File(config.getSenderFolderPath(), Constans.SYNC_LOG_NAME).exists(); + assert !new File(config.getSenderFolderPath(), SyncConstant.SYNC_LOG_NAME).exists(); senderLogger = new SyncSenderLogger( - new File(config.getSenderFolderPath(), Constans.SYNC_LOG_NAME)); + new File(config.getSenderFolderPath(), SyncConstant.SYNC_LOG_NAME)); manager.getValidFiles(dataDir); assert !isEmpty(manager.getLastLocalFilesMap()); senderLogger.startSyncDeletedFilesName(); diff --git a/server/src/test/java/org/apache/iotdb/db/sync/sender/recover/SyncSenderLoggerTest.java b/server/src/test/java/org/apache/iotdb/db/sync/sender/recover/SyncSenderLoggerTest.java index 3fe2e33..0cb1205 100644 --- a/server/src/test/java/org/apache/iotdb/db/sync/sender/recover/SyncSenderLoggerTest.java +++ b/server/src/test/java/org/apache/iotdb/db/sync/sender/recover/SyncSenderLoggerTest.java @@ -28,7 +28,7 @@ import org.apache.iotdb.db.conf.directories.DirectoryManager; import org.apache.iotdb.db.exception.DiskSpaceInsufficientException; import org.apache.iotdb.db.exception.StartupException; import org.apache.iotdb.db.exception.StorageEngineException; -import org.apache.iotdb.db.sync.sender.conf.Constans; +import org.apache.iotdb.db.sync.sender.conf.SyncConstant; import org.apache.iotdb.db.sync.sender.conf.SyncSenderConfig; import org.apache.iotdb.db.sync.sender.conf.SyncSenderDescriptor; import org.apache.iotdb.db.utils.EnvironmentUtils; @@ -59,7 +59,7 @@ public class SyncSenderLoggerTest { @Test public void testSyncSenderLogger() throws IOException { senderLogger = new SyncSenderLogger( - new File(config.getSenderFolderPath(), Constans.SYNC_LOG_NAME)); + new File(config.getSenderFolderPath(), SyncConstant.SYNC_LOG_NAME)); Set<String> deletedFileNames = new HashSet<>(); Set<String> deletedFileNamesTest = new HashSet<>(); senderLogger.startSyncDeletedFilesName(); @@ -81,7 +81,7 @@ public class SyncSenderLoggerTest { int count = 0; int mode = 0; try (BufferedReader br = new BufferedReader( - new FileReader(new File(config.getSenderFolderPath(), Constans.SYNC_LOG_NAME)))) { + new FileReader(new File(config.getSenderFolderPath(), SyncConstant.SYNC_LOG_NAME)))) { String line; while ((line = br.readLine()) != null) { count++; diff --git a/server/src/test/java/org/apache/iotdb/db/sync/sender/transfer/DataTransferManagerTest.java b/server/src/test/java/org/apache/iotdb/db/sync/sender/transfer/DataTransferManagerTest.java index ef49dd5..9c1302e 100644 --- a/server/src/test/java/org/apache/iotdb/db/sync/sender/transfer/DataTransferManagerTest.java +++ b/server/src/test/java/org/apache/iotdb/db/sync/sender/transfer/DataTransferManagerTest.java @@ -127,9 +127,9 @@ public class DataTransferManagerTest { assert snapFileMap.get(sg).containsAll(tsfiles); } - assert !new File(config.getLastFileInfo()).exists(); + assert !new File(config.getLastFileInfoPath()).exists(); senderLogAnalyzer.recover(); assert !new File(config.getSnapshotPath()).exists(); - assert new File(config.getLastFileInfo()).exists(); + assert new File(config.getLastFileInfoPath()).exists(); } } \ No newline at end of file
