This is an automated email from the ASF dual-hosted git repository. lta pushed a commit to branch Sync-Reconstruct in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
commit ab830f9b15060898ea040f473353e36c818c5a51 Author: lta <[email protected]> AuthorDate: Sun Mar 17 16:12:02 2019 +0800 reconstruct sync server --- .../db/engine/filenode/FileNodeProcessor.java | 10 +- .../org/apache/iotdb/db/sync/conf/Constans.java | 5 +- .../iotdb/db/sync/conf/SyncSenderConfig.java | 6 +- .../iotdb/db/sync/conf/SyncSenderDescriptor.java | 6 +- .../iotdb/db/sync/receiver/ServerServiceImpl.java | 232 +++++++++++++-------- .../apache/iotdb/db/sync/sender/FileManager.java | 2 +- .../iotdb/db/sync/sender/FileSenderImpl.java | 12 +- .../iotdb/db/sync/sender/FileManagerTest.java | 2 +- service-rpc/src/main/thrift/sync.thrift | 6 +- 9 files changed, 164 insertions(+), 117 deletions(-) diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessor.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessor.java index 3fddd8d..be775a5 100644 --- a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessor.java +++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessor.java @@ -19,6 +19,8 @@ package org.apache.iotdb.db.engine.filenode; +import static java.time.ZonedDateTime.ofInstant; + import java.io.File; import java.io.IOException; import java.nio.file.FileSystems; @@ -41,7 +43,6 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantReadWriteLock; - import org.apache.iotdb.db.conf.IoTDBConfig; import org.apache.iotdb.db.conf.IoTDBConstant; import org.apache.iotdb.db.conf.IoTDBDescriptor; @@ -102,8 +103,6 @@ import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import static java.time.ZonedDateTime.ofInstant; - public class FileNodeProcessor extends Processor implements IStatistic { private static final String WARN_NO_SUCH_OVERFLOWED_FILE = "Can not find any tsfile which" @@ -838,9 +837,8 @@ public class FileNodeProcessor extends Processor implements IStatistic { appendFile.getFilePath())); } if (!originFile.renameTo(targetFile)) { - LOGGER.warn("File renaming failed when appending new file. Origin: {}, target: {}", - originFile.getPath(), - targetFile.getPath()); + LOGGER.warn("File renaming failed when appending new file. Origin: {}, Target: {}", + originFile.getPath(), targetFile.getPath()); } // append the new tsfile this.newFileNodes.add(appendFile); diff --git a/iotdb/src/main/java/org/apache/iotdb/db/sync/conf/Constans.java b/iotdb/src/main/java/org/apache/iotdb/db/sync/conf/Constans.java index 31e0ad4..cc2581a 100644 --- a/iotdb/src/main/java/org/apache/iotdb/db/sync/conf/Constans.java +++ b/iotdb/src/main/java/org/apache/iotdb/db/sync/conf/Constans.java @@ -27,11 +27,12 @@ public class Constans { } public static final String CONFIG_NAME = "iotdb-sync-client.properties"; - public static final String SYNC = "sync"; + public static final String SYNC_CLIENT = "sync-client"; + public static final String SYNC_SERVER = "sync-server"; public static final String UUID_FILE_NAME = "uuid.txt"; public static final String LAST_LOCAL_FILE_NAME = "last_local_files.txt"; - public static final String DATA_SNAPSHOT_NAME = "data_snapshot"; + public static final String DATA_SNAPSHOT_NAME = "data-snapshot"; /** * Split data file , block size at each transmission diff --git a/iotdb/src/main/java/org/apache/iotdb/db/sync/conf/SyncSenderConfig.java b/iotdb/src/main/java/org/apache/iotdb/db/sync/conf/SyncSenderConfig.java index 0def77c..fe33f2e 100644 --- a/iotdb/src/main/java/org/apache/iotdb/db/sync/conf/SyncSenderConfig.java +++ b/iotdb/src/main/java/org/apache/iotdb/db/sync/conf/SyncSenderConfig.java @@ -51,9 +51,9 @@ public class SyncSenderConfig { && dataDirectory.charAt(dataDirectory.length() - 1) != File.separatorChar) { dataDirectory += File.separatorChar; } - uuidPath = dataDirectory + Constans.SYNC + File.separatorChar + Constans.UUID_FILE_NAME; + uuidPath = dataDirectory + Constans.SYNC_CLIENT + File.separatorChar + Constans.UUID_FILE_NAME; lastFileInfo = - dataDirectory + Constans.SYNC + File.separatorChar + Constans.LAST_LOCAL_FILE_NAME; + dataDirectory + Constans.SYNC_CLIENT + File.separatorChar + Constans.LAST_LOCAL_FILE_NAME; snapshotPaths = new String[iotdbBufferwriteDirectory.length]; for (int i = 0; i < iotdbBufferwriteDirectory.length; i++) { iotdbBufferwriteDirectory[i] = new File(iotdbBufferwriteDirectory[i]).getAbsolutePath(); @@ -62,7 +62,7 @@ public class SyncSenderConfig { != File.separatorChar) { iotdbBufferwriteDirectory[i] = iotdbBufferwriteDirectory[i] + File.separatorChar; } - snapshotPaths[i] = iotdbBufferwriteDirectory[i] + Constans.SYNC + File.separatorChar + snapshotPaths[i] = iotdbBufferwriteDirectory[i] + Constans.SYNC_CLIENT + File.separatorChar + Constans.DATA_SNAPSHOT_NAME + File.separatorChar; } diff --git a/iotdb/src/main/java/org/apache/iotdb/db/sync/conf/SyncSenderDescriptor.java b/iotdb/src/main/java/org/apache/iotdb/db/sync/conf/SyncSenderDescriptor.java index 04a5e7d..c04336f 100644 --- a/iotdb/src/main/java/org/apache/iotdb/db/sync/conf/SyncSenderDescriptor.java +++ b/iotdb/src/main/java/org/apache/iotdb/db/sync/conf/SyncSenderDescriptor.java @@ -104,9 +104,9 @@ public class SyncSenderDescriptor { dataDirectory += File.separatorChar; } conf.setUuidPath( - dataDirectory + Constans.SYNC + File.separatorChar + Constans.UUID_FILE_NAME); + dataDirectory + Constans.SYNC_CLIENT + File.separatorChar + Constans.UUID_FILE_NAME); conf.setLastFileInfo( - dataDirectory + Constans.SYNC + File.separatorChar + Constans.LAST_LOCAL_FILE_NAME); + dataDirectory + Constans.SYNC_CLIENT + File.separatorChar + Constans.LAST_LOCAL_FILE_NAME); String[] iotdbBufferwriteDirectory = conf.getIotdbBufferwriteDirectory(); String[] snapshots = new String[conf.getIotdbBufferwriteDirectory().length]; for (int i = 0; i < conf.getIotdbBufferwriteDirectory().length; i++) { @@ -115,7 +115,7 @@ public class SyncSenderDescriptor { != File.separatorChar) { iotdbBufferwriteDirectory[i] = iotdbBufferwriteDirectory[i] + File.separatorChar; } - snapshots[i] = iotdbBufferwriteDirectory[i] + Constans.SYNC + File.separatorChar + snapshots[i] = iotdbBufferwriteDirectory[i] + Constans.SYNC_CLIENT + File.separatorChar + Constans.DATA_SNAPSHOT_NAME + File.separatorChar; } diff --git a/iotdb/src/main/java/org/apache/iotdb/db/sync/receiver/ServerServiceImpl.java b/iotdb/src/main/java/org/apache/iotdb/db/sync/receiver/ServerServiceImpl.java index 435c20b..9674767 100644 --- a/iotdb/src/main/java/org/apache/iotdb/db/sync/receiver/ServerServiceImpl.java +++ b/iotdb/src/main/java/org/apache/iotdb/db/sync/receiver/ServerServiceImpl.java @@ -35,6 +35,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.Objects; import java.util.Set; import org.apache.iotdb.db.conf.IoTDBConfig; import org.apache.iotdb.db.conf.IoTDBDescriptor; @@ -47,9 +48,11 @@ import org.apache.iotdb.db.exception.MetadataArgsErrorException; import org.apache.iotdb.db.exception.PathErrorException; import org.apache.iotdb.db.exception.ProcessorException; import org.apache.iotdb.db.metadata.MManager; +import org.apache.iotdb.db.metadata.MetadataConstant; import org.apache.iotdb.db.metadata.MetadataOperationType; import org.apache.iotdb.db.qp.executor.OverflowQPExecutor; import org.apache.iotdb.db.qp.physical.crud.InsertPlan; +import org.apache.iotdb.db.sync.conf.Constans; import org.apache.iotdb.db.utils.SyncUtils; import org.apache.iotdb.service.sync.thrift.SyncDataStatus; import org.apache.iotdb.service.sync.thrift.SyncService; @@ -77,27 +80,57 @@ import org.slf4j.LoggerFactory; public class ServerServiceImpl implements SyncService.Iface { private static final Logger logger = LoggerFactory.getLogger(ServerServiceImpl.class); + private static final FileNodeManager fileNodeManager = FileNodeManager.getInstance(); + /** + * Metadata manager + **/ private static final MManager metadataManger = MManager.getInstance(); - private static final String POSTBACK = "sync"; + + private static final String SYNC_SERVER = Constans.SYNC_SERVER; + private ThreadLocal<String> uuid = new ThreadLocal<>(); - // String means Storage Group,List means the set of new Files(AbsulutePath) in local IoTDB - // String means AbsulutePath of new Files + /** + * String means storage group,List means the set of new files(path) in local IoTDB and String + * means path of new Files + **/ private ThreadLocal<Map<String, List<String>>> fileNodeMap = new ThreadLocal<>(); - // Map String1 means timeseries String2 means AbsulutePath of new Files, long means startTime + /** + * Map String1 means timeseries String2 means path of new Files, long means startTime + **/ private ThreadLocal<Map<String, Map<String, Long>>> fileNodeStartTime = new ThreadLocal<>(); - // Map String1 means timeseries String2 means AbsulutePath of new Files, long means endTime + /** + * Map String1 means timeseries String2 means path of new Files, long means endTime + **/ private ThreadLocal<Map<String, Map<String, Long>>> fileNodeEndTime = new ThreadLocal<>(); + + /** + * Total num of files that needs to be loaded + */ private ThreadLocal<Integer> fileNum = new ThreadLocal<>(); + + /** + * IoTDB ioTDBConfig + **/ + private IoTDBConfig ioTDBConfig = IoTDBDescriptor.getInstance().getConfig(); + + /** + * IoTDB data directory + **/ + private String dataPath = ioTDBConfig.getDataDir(); + + /** + * IoTDB multiple bufferWrite directory + **/ + private String[] bufferWritePaths = ioTDBConfig.getBufferWriteDirs(); + + /** + * The path to store metadata file of sender + */ private ThreadLocal<String> schemaFromSenderPath = new ThreadLocal<>(); - private IoTDBConfig tsfileDBconfig = IoTDBDescriptor.getInstance().getConfig(); - private String postbackPath; - // Absolute seriesPath of IoTDB data directory - private String dataPath = - new File(tsfileDBconfig.getDataDir()).getAbsolutePath() + File.separator; - // Absolute paths of IoTDB bufferWrite directory - private String[] bufferWritePaths = tsfileDBconfig.getBufferWriteDirs(); - private IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig(); + + /** Sync path of server **/ + private String syncPath; /** * Init threadLocal variable @@ -105,9 +138,7 @@ public class ServerServiceImpl implements SyncService.Iface { @Override public void init(String storageGroup) { if (logger.isInfoEnabled()) { - logger.info( - "IoTDB post back receiver: sync process starts to receive data of storage group {}", - storageGroup); + logger.info("Sync process starts to receive data of storage group {}", storageGroup); } fileNum.set(0); fileNodeMap.set(new HashMap<>()); @@ -119,73 +150,88 @@ public class ServerServiceImpl implements SyncService.Iface { * Verify IP address of sender */ @Override - public boolean getUUID(String uuid, String ipAddress) throws TException { + public boolean checkIdentity(String uuid, String ipAddress) throws TException { this.uuid.set(uuid); - postbackPath = dataPath + POSTBACK + File.separator; - schemaFromSenderPath.set(postbackPath + this.uuid.get() + File.separator + "mlog.txt"); - if (new File(postbackPath + this.uuid.get()).exists() - && new File(postbackPath + this.uuid.get()).list().length != 0) { + initPath(); + return SyncUtils.verifyIPSegment(ioTDBConfig.getIpWhiteList(), ipAddress); + } + + /** + * Init file path and clear data if last sync process failed. + */ + private void initPath() throws TException { + if (dataPath.length() > 0 && dataPath.charAt(dataPath.length() - 1) != File.separatorChar) { + dataPath = dataPath + File.separatorChar; + } + syncPath = dataPath + SYNC_SERVER + File.separator; + schemaFromSenderPath + .set(syncPath + this.uuid.get() + File.separator + MetadataConstant.METADATA_LOG); + File syncFileDirectory = new File(syncPath,this.uuid.get()); + if (syncFileDirectory.exists() + && Objects.requireNonNull(syncFileDirectory.list()).length != 0) { try { - SyncUtils.deleteFile(new File(postbackPath + this.uuid.get())); + SyncUtils.deleteFile(syncFileDirectory); } catch (IOException e) { + logger.error("Cannot clear useless metadata file."); throw new TException(e); } } for (String bufferWritePath : bufferWritePaths) { - String backupPath = bufferWritePath + POSTBACK + File.separator; - if (new File(backupPath + this.uuid.get()).exists() - && new File(backupPath + this.uuid.get()).list().length != 0) { - // if does not exist, it means that the last time sync failed, clear uuid - // data and receive the data again + if (bufferWritePath.length() > 0 + && bufferWritePath.charAt(bufferWritePath.length() - 1) != File.separatorChar) { + bufferWritePath = bufferWritePath + File.separatorChar; + } + String backupPath = bufferWritePath + SYNC_SERVER + File.separator; + File backupDirectory = new File(backupPath, this.uuid.get()); + if (backupDirectory.exists() && Objects.requireNonNull(backupDirectory.list()).length != 0) { + /** if does not exist, it means that the last time sync failed, clear data in the uuid directory and receive the data again **/ try { - SyncUtils.deleteFile(new File(backupPath + this.uuid.get())); + SyncUtils.deleteFile(backupDirectory); } catch (IOException e) { + logger.error("Cannot clear useless backup data file"); throw new TException(e); } } } - return SyncUtils.verifyIPSegment(config.getIpWhiteList(), ipAddress); } - /** - * Get schema from sender + * Acquire schema from sender * - * @param status: 0 or 1. status = 0 : finish receiving schema file, start to insert schema to - * IoTDB through jdbc status = 1 : the schema file has not received completely. + * @param status: SUCCESS_STATUS or PROCESSING_STATUS. status = SUCCESS_STATUS : finish receiving schema file, start to sync schema. + * status = SUCCESS_STATUS : the schema file has not received completely. */ @Override - public void getSchema(ByteBuffer schema, SyncDataStatus status) { + public void syncSchema(ByteBuffer schema, SyncDataStatus status) { if (status == SyncDataStatus.SUCCESS_STATUS) { /** sync metadata, include storage group and timeseries **/ - syncMetadata(); + loadMetadata(); } else { File file = new File(schemaFromSenderPath.get()); if (!file.getParentFile().exists()) { try { file.getParentFile().mkdirs(); if (!file.createNewFile()) { - logger.error("IoTDB post back receiver: cannot create file {}", - file.getAbsoluteFile()); + logger.error("Cannot create file {}", file.getPath()); } } catch (IOException e) { - logger.error("IoTDB post back receiver: cannot make schema file.", e); + logger.error("Cannot make schema file {}.", file.getPath(),e); } } try (FileOutputStream fos = new FileOutputStream(file, true); FileChannel channel = fos.getChannel()) { channel.write(schema); } catch (Exception e) { - logger.error("IoTDB post back receiver: cannot write data to file.", e); + logger.error("Cannot write data to file {}.", file.getPath(),e); } } } /** - * Sync metadata with sender + * Load metadata from sender */ - private void syncMetadata() { + private void loadMetadata() { if (new File(schemaFromSenderPath.get()).exists()) { try (BufferedReader br = new BufferedReader( new java.io.FileReader(schemaFromSenderPath.get()))) { @@ -194,12 +240,12 @@ public class ServerServiceImpl implements SyncService.Iface { operation(metadataOperation); } } catch (FileNotFoundException e) { - logger.error("IoTDB post back receiver: cannot read the file {}.", + logger.error("Cannot read the file {}.", schemaFromSenderPath.get(), e); } catch (IOException e) { - logger.error("IoTDB post back receiver: cannot insert schema to IoTDB.", e); + logger.error("Cannot insert schema to IoTDB.", e); } catch (Exception e) { - logger.error("IoTDB post back receiver: parse metadata operation failed.", e); + logger.error("Parse metadata operation failed.", e); } } } @@ -255,15 +301,16 @@ public class ServerServiceImpl implements SyncService.Iface { /** * Start receiving tsfile from sender * - * @param status status = 0 : finish receiving one tsfile status = 1 : a tsfile has not received + * @param status status = SUCCESS_STATUS : finish receiving one tsfile status = PROCESSING_STATUS : tsfile has not received * completely. */ @Override - public String receiveData(String md5OfSender, List<String> filePathSplit, + public String syncData(String md5OfSender, List<String> filePathSplit, ByteBuffer dataToReceive, SyncDataStatus status) { String md5OfReceiver = ""; StringBuilder filePathBuilder = new StringBuilder(); FileChannel channel; + /**Recombination File Path**/ for (int i = 0; i < filePathSplit.size(); i++) { if (i == filePathSplit.size() - 1) { filePathBuilder.append(filePathSplit.get(i)); @@ -272,31 +319,33 @@ public class ServerServiceImpl implements SyncService.Iface { } } String filePath = filePathBuilder.toString(); - filePath = postbackPath + uuid.get() + File.separator + filePath; + if(syncPath.length() > 0 && syncPath.charAt(syncPath.length()-1)!=File.separatorChar){ + syncPath = syncPath+File.separatorChar; + } + filePath = syncPath + uuid.get() + File.separator + filePath; if (status == SyncDataStatus.PROCESSING_STATUS) { // there are still data stream to add File file = new File(filePath); if (!file.getParentFile().exists()) { try { file.getParentFile().mkdirs(); if (!file.createNewFile()) { - logger.error("IoTDB post back receiver: cannot create file {}", file.getAbsoluteFile()); + logger.error("cannot create file {}", file.getPath()); } } catch (IOException e) { - logger.error("IoTDB post back receiver: cannot make file", e); + logger.error("cannot make file {}", file.getPath(), e); } } try (FileOutputStream fos = new FileOutputStream(file, true)) {// append new data channel = fos.getChannel(); channel.write(dataToReceive); } catch (IOException e) { - logger.error("IoTDB post back receiver: cannot write data to file", e); + logger.error("cannot write data to file {}",file.getPath(), e); } } else { // all data in the same file has received successfully try (FileInputStream fis = new FileInputStream(filePath)) { MessageDigest md = MessageDigest.getInstance("MD5"); - int mBufferSize = 8 * 1024 * 1024; - byte[] buffer = new byte[mBufferSize]; + byte[] buffer = new byte[Constans.DATA_CHUNK_SIZE]; int n; while ((n = fis.read(buffer)) != -1) { md.update(buffer, 0, n); @@ -305,18 +354,15 @@ public class ServerServiceImpl implements SyncService.Iface { if (md5OfSender.equals(md5OfReceiver)) { fileNum.set(fileNum.get() + 1); if (logger.isInfoEnabled()) { - logger.info(String - .format("IoTDB post back receiver : Receiver has received %d files from sender", - fileNum.get())); + logger.info(String.format("Receiver has received %d files from sender", fileNum.get())); } } else { if (!new File(filePath).delete()) { - logger.error("IoTDB post back receiver : Receiver can not delete file {}", - new File(filePath).getAbsolutePath()); + logger.error("Receiver can not delete file {}", new File(filePath).getPath()); } } } catch (Exception e) { - logger.error("IoTDB post back receiver: cannot generate md5", e); + logger.error("Receiver cannot generate md5", e); } } return md5OfReceiver; @@ -328,19 +374,22 @@ public class ServerServiceImpl implements SyncService.Iface { getFileNodeInfo(); loadData(); try { - SyncUtils.deleteFile(new File(postbackPath + this.uuid.get())); + SyncUtils.deleteFile(new File(syncPath + this.uuid.get())); } catch (IOException e) { throw new TException(e); } for (String bufferWritePath : bufferWritePaths) { - String backupPath = bufferWritePath + POSTBACK + File.separator; - if (new File(backupPath + this.uuid.get()).exists() - && new File(backupPath + this.uuid.get()).list().length != 0) { - // if does not exist, it means that the last time sync process failed, clear - // uuid data and receive the data again + if (bufferWritePath.length() > 0 + && bufferWritePath.charAt(bufferWritePath.length() - 1) != File.separatorChar) { + bufferWritePath = bufferWritePath + File.separatorChar; + } + String backupPath = bufferWritePath + SYNC_SERVER + File.separator; + File backupDirectory = new File(backupPath, this.uuid.get()); + if (backupDirectory.exists() && Objects.requireNonNull(backupDirectory.list()).length != 0) { try { - SyncUtils.deleteFile(new File(backupPath + this.uuid.get())); + SyncUtils.deleteFile(backupDirectory); } catch (IOException e) { + logger.error("Cannot clear useless backup data file"); throw new TException(e); } } @@ -352,10 +401,9 @@ public class ServerServiceImpl implements SyncService.Iface { * Get all tsfiles' info which are sent from sender, it is prepare for merging these data */ public void getFileNodeInfo() { - String filePath = postbackPath + uuid.get() + File.separator + "data"; - File root = new File(filePath); - File[] files = root.listFiles(); - int num = 0; + File dataFileRoot = new File(syncPath,this.uuid.get()); + File[] files = dataFileRoot.listFiles(); + int processedNum = 0; for (File storageGroupPB : files) { List<String> filesPath = new ArrayList<>(); File[] filesSG = storageGroupPB.listFiles(); @@ -364,36 +412,33 @@ public class ServerServiceImpl implements SyncService.Iface { Map<String, Long> endTimeMap = new HashMap<>(); TsFileSequenceReader reader = null; try { - reader = new TsFileSequenceReader(fileTF.getAbsolutePath()); + reader = new TsFileSequenceReader(fileTF.getPath()); Map<String, TsDeviceMetadataIndex> deviceIdMap = reader.readFileMetadata().getDeviceMap(); Iterator<String> it = deviceIdMap.keySet().iterator(); while (it.hasNext()) { - String key = it.next(); // key represent device + String key = it.next(); TsDeviceMetadataIndex device = deviceIdMap.get(key); startTimeMap.put(key, device.getStartTime()); endTimeMap.put(key, device.getEndTime()); } } catch (Exception e) { - logger.error("IoTDB post back receiver: unable to read tsfile {}", - fileTF.getAbsolutePath(), e); + logger.error("Unable to read tsfile {}", fileTF.getPath(), e); } finally { try { if (reader != null) { reader.close(); } } catch (IOException e) { - logger.error("IoTDB receiver : Cannot close file stream {}", - fileTF.getAbsolutePath(), e); + logger.error("Cannot close file stream {}", fileTF.getPath(), e); } } fileNodeStartTime.get().put(fileTF.getAbsolutePath(), startTimeMap); fileNodeEndTime.get().put(fileTF.getAbsolutePath(), endTimeMap); filesPath.add(fileTF.getAbsolutePath()); - num++; + processedNum++; if (logger.isInfoEnabled()) { logger.info(String - .format("IoTDB receiver : Getting FileNode Info has complete : %d/%d", num, - fileNum.get())); + .format("Get tsfile info has complete : %d/%d", processedNum, fileNum.get())); } fileNodeMap.get().put(storageGroupPB.getName(), filesPath); } @@ -406,11 +451,14 @@ public class ServerServiceImpl implements SyncService.Iface { * directly. If data in the tsfile is old, it has two strategy to merge.It depends on the * possibility of updating historical data. */ - public void loadData() throws TException { - int num = 0; + public void loadData() { + if(syncPath.length() > 0 && syncPath.charAt(syncPath.length()-1) != File.separatorChar){ + syncPath =syncPath + File.separatorChar; + } + int processedNum = 0; for (String storageGroup : fileNodeMap.get().keySet()) { List<String> filesPath = fileNodeMap.get().get(storageGroup); - // before load extern tsFile, it is necessary to order files in the same SG + /** before load external tsFile, it is necessary to order files in the same storage group **/ for (int i = 0; i < filesPath.size(); i++) { for (int j = i + 1; j < filesPath.size(); j++) { boolean swapOrNot = false; @@ -439,7 +487,7 @@ public class ServerServiceImpl implements SyncService.Iface { Map<String, Long> endTimeMap = fileNodeEndTime.get().get(path); // create a new fileNode - String header = postbackPath + uuid.get() + File.separator + "data" + File.separator; + String header = syncPath + uuid.get() + File.separator; String relativePath = path.substring(header.length()); TsFileResource fileNode = new TsFileResource(startTimeMap, endTimeMap, OverflowChangeType.NO_CHANGE, @@ -448,7 +496,7 @@ public class ServerServiceImpl implements SyncService.Iface { try { if (!fileNodeManager.appendFileToFileNode(storageGroup, fileNode, path)) { // it is a file with overflow data - if (config.isUpdate_historical_data_possibility()) { + if (ioTDBConfig.isUpdate_historical_data_possibility()) { loadOldData(path); } else { List<String> overlapFiles = fileNodeManager.getOverlapFilesFromFileNode( @@ -462,13 +510,13 @@ public class ServerServiceImpl implements SyncService.Iface { } } } catch (FileNodeManagerException e) { - logger.error("IoTDB receiver : Can not load external file ", e); + logger.error("Can not load external file ", e); } - num++; + processedNum++; if (logger.isInfoEnabled()) { logger.info(String - .format("IoTDB receiver : Merging files has completed : %d/%d", num, fileNum.get())); + .format("Merging files has completed : %d/%d", processedNum, fileNum.get())); } } } @@ -477,7 +525,7 @@ public class ServerServiceImpl implements SyncService.Iface { /** * Insert all data in the tsfile into IoTDB. */ - public void loadOldData(String filePath) throws TException { + public void loadOldData(String filePath) { Set<String> timeseriesSet = new HashSet<>(); TsFileSequenceReader reader = null; OverflowQPExecutor insertExecutor = new OverflowQPExecutor(); @@ -536,7 +584,7 @@ public class ServerServiceImpl implements SyncService.Iface { } } } catch (IOException e) { - logger.error("IoTDB receiver can not parse tsfile into SQL", e); + logger.error("Receiver can not parse tsfile into SQL", e); } catch (ProcessorException e) { logger.error("Meet error while processing non-query.", e); } finally { @@ -545,7 +593,7 @@ public class ServerServiceImpl implements SyncService.Iface { reader.close(); } } catch (IOException e) { - logger.error("IoTDB receiver : Cannot close file stream {}", filePath, e); + logger.error("Cannot close file stream {}", filePath, e); } } } @@ -664,14 +712,14 @@ public class ServerServiceImpl implements SyncService.Iface { } } } catch (IOException e) { - logger.error("IoTDB receiver can not parse tsfile into SQL", e); + logger.error("Can not parse tsfile into SQL", e); } catch (ProcessorException e) { logger.error("Meet error while processing non-query.", e); } finally { try { reader.close(); } catch (IOException e) { - logger.error("IoTDB receiver : Cannot close file stream {}", filePath, e); + logger.error("Cannot close file stream {}", filePath, e); } } } @@ -687,7 +735,7 @@ public class ServerServiceImpl implements SyncService.Iface { fileNodeStartTime.remove(); fileNodeEndTime.remove(); schemaFromSenderPath.remove(); - logger.info("IoTDB post back receiver: the postBack has finished!"); + logger.info("Synchronization has finished!"); } public Map<String, List<String>> getFileNodeMap() { diff --git a/iotdb/src/main/java/org/apache/iotdb/db/sync/sender/FileManager.java b/iotdb/src/main/java/org/apache/iotdb/db/sync/sender/FileManager.java index f0a3b66..d2d4258 100644 --- a/iotdb/src/main/java/org/apache/iotdb/db/sync/sender/FileManager.java +++ b/iotdb/src/main/java/org/apache/iotdb/db/sync/sender/FileManager.java @@ -149,7 +149,7 @@ public class FileManager { } File[] listFiles = new File(path).listFiles(); for (File storageGroup : listFiles) { - if (storageGroup.isDirectory() && !storageGroup.getName().equals(Constans.SYNC)) { + if (storageGroup.isDirectory() && !storageGroup.getName().equals(Constans.SYNC_CLIENT)) { if (!currentLocalFiles.containsKey(storageGroup.getName())) { currentLocalFiles.put(storageGroup.getName(), new HashSet<>()); } diff --git a/iotdb/src/main/java/org/apache/iotdb/db/sync/sender/FileSenderImpl.java b/iotdb/src/main/java/org/apache/iotdb/db/sync/sender/FileSenderImpl.java index e8939de..c099c56 100644 --- a/iotdb/src/main/java/org/apache/iotdb/db/sync/sender/FileSenderImpl.java +++ b/iotdb/src/main/java/org/apache/iotdb/db/sync/sender/FileSenderImpl.java @@ -332,7 +332,7 @@ public class FileSenderImpl implements FileSender { } boolean legalConnection; try { - legalConnection = serviceClient.getUUID(uuid, + legalConnection = serviceClient.checkIdentity(uuid, InetAddress.getLocalHost().getHostAddress()); } catch (Exception e) { LOGGER.error("cannot confirm identity with receiver"); @@ -342,7 +342,7 @@ public class FileSenderImpl implements FileSender { } private String generateUUID() { - return Constans.SYNC + UUID.randomUUID().toString().replaceAll("-", ""); + return Constans.SYNC_CLIENT + UUID.randomUUID().toString().replaceAll("-", ""); } /** @@ -403,7 +403,7 @@ public class FileSenderImpl implements FileSender { ByteBuffer buffToSend = ByteBuffer.wrap(bos.toByteArray()); bos.reset(); serviceClient - .receiveData(null, filePathSplit, buffToSend, SyncDataStatus.PROCESSING_STATUS); + .syncData(null, filePathSplit, buffToSend, SyncDataStatus.PROCESSING_STATUS); } bos.close(); } @@ -418,7 +418,7 @@ public class FileSenderImpl implements FileSender { // the file is sent successfully String md5OfSender = (new BigInteger(1, md.digest())).toString(16); - String md5OfReceiver = serviceClient.receiveData(md5OfSender, filePathSplit, + String md5OfReceiver = serviceClient.syncData(md5OfSender, filePathSplit, null, SyncDataStatus.SUCCESS_STATUS); if (md5OfSender.equals(md5OfReceiver)) { LOGGER.info("receiver has received {} successfully.", snapshotFilePath); @@ -450,11 +450,11 @@ public class FileSenderImpl implements FileSender { ByteBuffer buffToSend = ByteBuffer.wrap(bos.toByteArray()); bos.reset(); // 1 represents there is still schema buffer to send. - serviceClient.getSchema(buffToSend, SyncDataStatus.PROCESSING_STATUS); + serviceClient.syncSchema(buffToSend, SyncDataStatus.PROCESSING_STATUS); } bos.close(); // 0 represents the schema file has been transferred completely. - serviceClient.getSchema(null, SyncDataStatus.SUCCESS_STATUS); + serviceClient.syncSchema(null, SyncDataStatus.SUCCESS_STATUS); } catch (Exception e) { LOGGER.error("cannot sync schema ", e); throw new SyncConnectionException(e); diff --git a/iotdb/src/test/java/org/apache/iotdb/db/sync/sender/FileManagerTest.java b/iotdb/src/test/java/org/apache/iotdb/db/sync/sender/FileManagerTest.java index a2dd6c6..a23bdbc 100644 --- a/iotdb/src/test/java/org/apache/iotdb/db/sync/sender/FileManagerTest.java +++ b/iotdb/src/test/java/org/apache/iotdb/db/sync/sender/FileManagerTest.java @@ -35,7 +35,7 @@ import org.slf4j.LoggerFactory; public class FileManagerTest { - public static final String POST_BACK_DIRECTORY_TEST = Constans.SYNC + File.separator; + public static final String POST_BACK_DIRECTORY_TEST = Constans.SYNC_CLIENT + File.separator; public static final String LAST_FILE_INFO_TEST = POST_BACK_DIRECTORY_TEST + Constans.LAST_LOCAL_FILE_NAME; public static final String SENDER_FILE_PATH_TEST = POST_BACK_DIRECTORY_TEST + "data"; diff --git a/service-rpc/src/main/thrift/sync.thrift b/service-rpc/src/main/thrift/sync.thrift index a947507..e139783 100755 --- a/service-rpc/src/main/thrift/sync.thrift +++ b/service-rpc/src/main/thrift/sync.thrift @@ -28,9 +28,9 @@ enum SyncDataStatus { } service SyncService{ - bool getUUID(1:string uuid, 2:string address) - void getSchema(1:binary buff, 2:SyncDataStatus status) - string receiveData(1:string md5, 2:list<string> filename, 3:binary buff, 4:SyncDataStatus status) + bool checkIdentity(1:string uuid, 2:string address) + void syncSchema(1:binary buff, 2:SyncDataStatus status) + string syncData(1:string md5, 2:list<string> filename, 3:binary buff, 4:SyncDataStatus status) bool load() void cleanUp() void init(1:string storageGroup)
