This is an automated email from the ASF dual-hosted git repository. lta pushed a commit to branch Sync-Reconstruct in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
commit 2f2e1d8e9c0922f9fcd5b4ac73c7e755d53f9865 Author: lta <[email protected]> AuthorDate: Sun Mar 17 15:01:26 2019 +0800 reconstruct thrift sync service --- .../iotdb/db/sync/receiver/ServerServiceImpl.java | 326 +++++++++++---------- .../iotdb/db/sync/sender/FileSenderImpl.java | 22 +- .../src/main/thrift/sync.thrift | 15 +- 3 files changed, 183 insertions(+), 180 deletions(-) diff --git a/iotdb/src/main/java/org/apache/iotdb/db/sync/receiver/ServerServiceImpl.java b/iotdb/src/main/java/org/apache/iotdb/db/sync/receiver/ServerServiceImpl.java index de2b2a3..435c20b 100644 --- a/iotdb/src/main/java/org/apache/iotdb/db/sync/receiver/ServerServiceImpl.java +++ b/iotdb/src/main/java/org/apache/iotdb/db/sync/receiver/ServerServiceImpl.java @@ -51,6 +51,8 @@ import org.apache.iotdb.db.metadata.MetadataOperationType; import org.apache.iotdb.db.qp.executor.OverflowQPExecutor; import org.apache.iotdb.db.qp.physical.crud.InsertPlan; import org.apache.iotdb.db.utils.SyncUtils; +import org.apache.iotdb.service.sync.thrift.SyncDataStatus; +import org.apache.iotdb.service.sync.thrift.SyncService; import org.apache.iotdb.tsfile.file.metadata.ChunkGroupMetaData; import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData; import org.apache.iotdb.tsfile.file.metadata.TsDeviceMetadata; @@ -72,7 +74,7 @@ import org.slf4j.LoggerFactory; /** * @author Tianan Li */ -public class ServerServiceImpl implements ServerService.Iface { +public class ServerServiceImpl implements SyncService.Iface { private static final Logger logger = LoggerFactory.getLogger(ServerServiceImpl.class); private static final FileNodeManager fileNodeManager = FileNodeManager.getInstance(); @@ -145,75 +147,6 @@ public class ServerServiceImpl implements ServerService.Iface { return SyncUtils.verifyIPSegment(config.getIpWhiteList(), ipAddress); } - /** - * Start receiving tsfile from sender - * - * @param status status = 0 : finish receiving one tsfile status = 1 : a tsfile has not received - * completely. - */ - @Override - public String startReceiving(String md5OfSender, List<String> filePathSplit, - ByteBuffer dataToReceive, int status) throws TException { - String md5OfReceiver = ""; - StringBuilder filePathBuilder = new StringBuilder(); - FileChannel channel; - for (int i = 0; i < filePathSplit.size(); i++) { - if (i == filePathSplit.size() - 1) { - filePathBuilder.append(filePathSplit.get(i)); - } else { - filePathBuilder.append(filePathSplit.get(i)).append(File.separator); - } - } - String filePath = filePathBuilder.toString(); - filePath = postbackPath + uuid.get() + File.separator + filePath; - if (status == 1) { // there are still data stream to add - File file = new File(filePath); - if (!file.getParentFile().exists()) { - try { - file.getParentFile().mkdirs(); - if (!file.createNewFile()) { - logger.error("IoTDB post back receiver: cannot create file {}", file.getAbsoluteFile()); - } - } catch (IOException e) { - logger.error("IoTDB post back receiver: cannot make file", e); - } - } - try (FileOutputStream fos = new FileOutputStream(file, true)) {// append new data - channel = fos.getChannel(); - channel.write(dataToReceive); - } catch (IOException e) { - logger.error("IoTDB post back receiver: cannot write data to file", e); - - } - } else { // all data in the same file has received successfully - try (FileInputStream fis = new FileInputStream(filePath)) { - MessageDigest md = MessageDigest.getInstance("MD5"); - int mBufferSize = 8 * 1024 * 1024; - byte[] buffer = new byte[mBufferSize]; - int n; - while ((n = fis.read(buffer)) != -1) { - md.update(buffer, 0, n); - } - md5OfReceiver = (new BigInteger(1, md.digest())).toString(16); - if (md5OfSender.equals(md5OfReceiver)) { - fileNum.set(fileNum.get() + 1); - if (logger.isInfoEnabled()) { - logger.info(String - .format("IoTDB post back receiver : Receiver has received %d files from sender", - fileNum.get())); - } - } else { - if (!new File(filePath).delete()) { - logger.error("IoTDB post back receiver : Receiver can not delete file {}", - new File(filePath).getAbsolutePath()); - } - } - } catch (Exception e) { - logger.error("IoTDB post back receiver: cannot generate md5", e); - } - } - return md5OfReceiver; - } /** * Get schema from sender @@ -222,8 +155,8 @@ public class ServerServiceImpl implements ServerService.Iface { * IoTDB through jdbc status = 1 : the schema file has not received completely. */ @Override - public void getSchema(ByteBuffer schema, int status) { - if (status == 0) { + public void getSchema(ByteBuffer schema, SyncDataStatus status) { + if (status == SyncDataStatus.SUCCESS_STATUS) { /** sync metadata, include storage group and timeseries **/ syncMetadata(); } else { @@ -319,10 +252,81 @@ public class ServerServiceImpl implements ServerService.Iface { } } + /** + * Start receiving tsfile from sender + * + * @param status status = 0 : finish receiving one tsfile status = 1 : a tsfile has not received + * completely. + */ + @Override + public String receiveData(String md5OfSender, List<String> filePathSplit, + ByteBuffer dataToReceive, SyncDataStatus status) { + String md5OfReceiver = ""; + StringBuilder filePathBuilder = new StringBuilder(); + FileChannel channel; + for (int i = 0; i < filePathSplit.size(); i++) { + if (i == filePathSplit.size() - 1) { + filePathBuilder.append(filePathSplit.get(i)); + } else { + filePathBuilder.append(filePathSplit.get(i)).append(File.separator); + } + } + String filePath = filePathBuilder.toString(); + filePath = postbackPath + uuid.get() + File.separator + filePath; + if (status == SyncDataStatus.PROCESSING_STATUS) { // there are still data stream to add + File file = new File(filePath); + if (!file.getParentFile().exists()) { + try { + file.getParentFile().mkdirs(); + if (!file.createNewFile()) { + logger.error("IoTDB post back receiver: cannot create file {}", file.getAbsoluteFile()); + } + } catch (IOException e) { + logger.error("IoTDB post back receiver: cannot make file", e); + } + } + try (FileOutputStream fos = new FileOutputStream(file, true)) {// append new data + channel = fos.getChannel(); + channel.write(dataToReceive); + } catch (IOException e) { + logger.error("IoTDB post back receiver: cannot write data to file", e); + + } + } else { // all data in the same file has received successfully + try (FileInputStream fis = new FileInputStream(filePath)) { + MessageDigest md = MessageDigest.getInstance("MD5"); + int mBufferSize = 8 * 1024 * 1024; + byte[] buffer = new byte[mBufferSize]; + int n; + while ((n = fis.read(buffer)) != -1) { + md.update(buffer, 0, n); + } + md5OfReceiver = (new BigInteger(1, md.digest())).toString(16); + if (md5OfSender.equals(md5OfReceiver)) { + fileNum.set(fileNum.get() + 1); + if (logger.isInfoEnabled()) { + logger.info(String + .format("IoTDB post back receiver : Receiver has received %d files from sender", + fileNum.get())); + } + } else { + if (!new File(filePath).delete()) { + logger.error("IoTDB post back receiver : Receiver can not delete file {}", + new File(filePath).getAbsolutePath()); + } + } + } catch (Exception e) { + logger.error("IoTDB post back receiver: cannot generate md5", e); + } + } + return md5OfReceiver; + } + + @Override - public boolean merge() throws TException { + public boolean load() throws TException { getFileNodeInfo(); - mergeData(); + loadData(); try { SyncUtils.deleteFile(new File(postbackPath + this.uuid.get())); } catch (IOException e) { @@ -345,24 +349,9 @@ public class ServerServiceImpl implements ServerService.Iface { } /** - * Release threadLocal variable resources - */ - @Override - public void afterReceiving() { - uuid.remove(); - fileNum.remove(); - fileNodeMap.remove(); - fileNodeStartTime.remove(); - fileNodeEndTime.remove(); - schemaFromSenderPath.remove(); - logger.info("IoTDB post back receiver: the postBack has finished!"); - } - - /** * Get all tsfiles' info which are sent from sender, it is prepare for merging these data */ - @Override - public void getFileNodeInfo() throws TException { + public void getFileNodeInfo() { String filePath = postbackPath + uuid.get() + File.separator + "data"; File root = new File(filePath); File[] files = root.listFiles(); @@ -411,11 +400,84 @@ public class ServerServiceImpl implements ServerService.Iface { } } + + /** + * It is to merge data. If data in the tsfile is new, append the tsfile to the storage group + * directly. If data in the tsfile is old, it has two strategy to merge.It depends on the + * possibility of updating historical data. + */ + public void loadData() throws TException { + int num = 0; + for (String storageGroup : fileNodeMap.get().keySet()) { + List<String> filesPath = fileNodeMap.get().get(storageGroup); + // before load extern tsFile, it is necessary to order files in the same SG + for (int i = 0; i < filesPath.size(); i++) { + for (int j = i + 1; j < filesPath.size(); j++) { + boolean swapOrNot = false; + Map<String, Long> startTimeI = fileNodeStartTime.get().get(filesPath.get(i)); + Map<String, Long> endTimeI = fileNodeStartTime.get().get(filesPath.get(i)); + Map<String, Long> startTimeJ = fileNodeStartTime.get().get(filesPath.get(j)); + Map<String, Long> endTimeJ = fileNodeStartTime.get().get(filesPath.get(j)); + for (String deviceId : endTimeI.keySet()) { + if (startTimeJ.containsKey(deviceId) && startTimeI.get(deviceId) > + endTimeJ.get(deviceId)) { + swapOrNot = true; + break; + } + } + if (swapOrNot) { + String temp = filesPath.get(i); + filesPath.set(i, filesPath.get(j)); + filesPath.set(j, temp); + } + } + } + + for (String path : filesPath) { + // get startTimeMap and endTimeMap + Map<String, Long> startTimeMap = fileNodeStartTime.get().get(path); + Map<String, Long> endTimeMap = fileNodeEndTime.get().get(path); + + // create a new fileNode + String header = postbackPath + uuid.get() + File.separator + "data" + File.separator; + String relativePath = path.substring(header.length()); + TsFileResource fileNode = new TsFileResource(startTimeMap, endTimeMap, + OverflowChangeType.NO_CHANGE, + Directories.getInstance().getNextFolderIndexForTsFile(), relativePath); + // call interface of load external file + try { + if (!fileNodeManager.appendFileToFileNode(storageGroup, fileNode, path)) { + // it is a file with overflow data + if (config.isUpdate_historical_data_possibility()) { + loadOldData(path); + } else { + List<String> overlapFiles = fileNodeManager.getOverlapFilesFromFileNode( + storageGroup, + fileNode, uuid.get()); + if (overlapFiles.isEmpty()) { + loadOldData(path); + } else { + loadOldData(path, overlapFiles); + } + } + } + } catch (FileNodeManagerException e) { + logger.error("IoTDB receiver : Can not load external file ", e); + } + + num++; + if (logger.isInfoEnabled()) { + logger.info(String + .format("IoTDB receiver : Merging files has completed : %d/%d", num, fileNum.get())); + } + } + } + } + /** * Insert all data in the tsfile into IoTDB. */ - @Override - public void mergeOldData(String filePath) throws TException { + public void loadOldData(String filePath) throws TException { Set<String> timeseriesSet = new HashSet<>(); TsFileSequenceReader reader = null; OverflowQPExecutor insertExecutor = new OverflowQPExecutor(); @@ -493,7 +555,7 @@ public class ServerServiceImpl implements ServerService.Iface { * * @param overlapFiles:files which are conflict with the sync file */ - public void mergeOldData(String filePath, List<String> overlapFiles) { + public void loadOldData(String filePath, List<String> overlapFiles) { Set<String> timeseriesList = new HashSet<>(); TsFileSequenceReader reader = null; OverflowQPExecutor insertExecutor = new OverflowQPExecutor(); @@ -615,77 +677,17 @@ public class ServerServiceImpl implements ServerService.Iface { } /** - * It is to merge data. If data in the tsfile is new, append the tsfile to the storage group - * directly. If data in the tsfile is old, it has two strategy to merge.It depends on the - * possibility of updating historical data. + * Release threadLocal variable resources */ @Override - public void mergeData() throws TException { - int num = 0; - for (String storageGroup : fileNodeMap.get().keySet()) { - List<String> filesPath = fileNodeMap.get().get(storageGroup); - // before load extern tsFile, it is necessary to order files in the same SG - for (int i = 0; i < filesPath.size(); i++) { - for (int j = i + 1; j < filesPath.size(); j++) { - boolean swapOrNot = false; - Map<String, Long> startTimeI = fileNodeStartTime.get().get(filesPath.get(i)); - Map<String, Long> endTimeI = fileNodeStartTime.get().get(filesPath.get(i)); - Map<String, Long> startTimeJ = fileNodeStartTime.get().get(filesPath.get(j)); - Map<String, Long> endTimeJ = fileNodeStartTime.get().get(filesPath.get(j)); - for (String deviceId : endTimeI.keySet()) { - if (startTimeJ.containsKey(deviceId) && startTimeI.get(deviceId) > - endTimeJ.get(deviceId)) { - swapOrNot = true; - break; - } - } - if (swapOrNot) { - String temp = filesPath.get(i); - filesPath.set(i, filesPath.get(j)); - filesPath.set(j, temp); - } - } - } - - for (String path : filesPath) { - // get startTimeMap and endTimeMap - Map<String, Long> startTimeMap = fileNodeStartTime.get().get(path); - Map<String, Long> endTimeMap = fileNodeEndTime.get().get(path); - - // create a new fileNode - String header = postbackPath + uuid.get() + File.separator + "data" + File.separator; - String relativePath = path.substring(header.length()); - TsFileResource fileNode = new TsFileResource(startTimeMap, endTimeMap, - OverflowChangeType.NO_CHANGE, - Directories.getInstance().getNextFolderIndexForTsFile(), relativePath); - // call interface of load external file - try { - if (!fileNodeManager.appendFileToFileNode(storageGroup, fileNode, path)) { - // it is a file with overflow data - if (config.isUpdate_historical_data_possibility()) { - mergeOldData(path); - } else { - List<String> overlapFiles = fileNodeManager.getOverlapFilesFromFileNode( - storageGroup, - fileNode, uuid.get()); - if (overlapFiles.isEmpty()) { - mergeOldData(path); - } else { - mergeOldData(path, overlapFiles); - } - } - } - } catch (FileNodeManagerException e) { - logger.error("IoTDB receiver : Can not load external file ", e); - } - - num++; - if (logger.isInfoEnabled()) { - logger.info(String - .format("IoTDB receiver : Merging files has completed : %d/%d", num, fileNum.get())); - } - } - } + public void cleanUp() { + uuid.remove(); + fileNum.remove(); + fileNodeMap.remove(); + fileNodeStartTime.remove(); + fileNodeEndTime.remove(); + schemaFromSenderPath.remove(); + logger.info("IoTDB post back receiver: the postBack has finished!"); } public Map<String, List<String>> getFileNodeMap() { diff --git a/iotdb/src/main/java/org/apache/iotdb/db/sync/sender/FileSenderImpl.java b/iotdb/src/main/java/org/apache/iotdb/db/sync/sender/FileSenderImpl.java index ea34a29..e8939de 100644 --- a/iotdb/src/main/java/org/apache/iotdb/db/sync/sender/FileSenderImpl.java +++ b/iotdb/src/main/java/org/apache/iotdb/db/sync/sender/FileSenderImpl.java @@ -48,8 +48,9 @@ import org.apache.iotdb.db.exception.SyncConnectionException; import org.apache.iotdb.db.sync.conf.Constans; import org.apache.iotdb.db.sync.conf.SyncSenderConfig; import org.apache.iotdb.db.sync.conf.SyncSenderDescriptor; -import org.apache.iotdb.db.sync.receiver.ServerService; import org.apache.iotdb.db.utils.SyncUtils; +import org.apache.iotdb.service.sync.thrift.SyncDataStatus; +import org.apache.iotdb.service.sync.thrift.SyncService; import org.apache.thrift.TException; import org.apache.thrift.protocol.TBinaryProtocol; import org.apache.thrift.protocol.TProtocol; @@ -68,7 +69,7 @@ public class FileSenderImpl implements FileSender { private static final Logger LOGGER = LoggerFactory.getLogger(FileSenderImpl.class); private TTransport transport; - private ServerService.Client serviceClient; + private SyncService.Client serviceClient; private List<String> schema = new ArrayList<>(); /** @@ -245,7 +246,7 @@ public class FileSenderImpl implements FileSender { // 8. notify receiver that synchronization finish // At this point the synchronization has finished even if connection fails try { - serviceClient.afterReceiving(); + serviceClient.cleanUp(); } catch (TException e) { LOGGER.error("unable to connect to receiver ", e); } @@ -291,7 +292,7 @@ public class FileSenderImpl implements FileSender { public void establishConnection(String serverIp, int serverPort) throws SyncConnectionException { transport = new TSocket(serverIp, serverPort); TProtocol protocol = new TBinaryProtocol(transport); - serviceClient = new ServerService.Client(protocol); + serviceClient = new SyncService.Client(protocol); try { transport.open(); } catch (TTransportException e) { @@ -401,7 +402,8 @@ public class FileSenderImpl implements FileSender { bos.write(buffer, 0, data); ByteBuffer buffToSend = ByteBuffer.wrap(bos.toByteArray()); bos.reset(); - serviceClient.startReceiving(null, filePathSplit, buffToSend, 1); + serviceClient + .receiveData(null, filePathSplit, buffToSend, SyncDataStatus.PROCESSING_STATUS); } bos.close(); } @@ -416,8 +418,8 @@ public class FileSenderImpl implements FileSender { // the file is sent successfully String md5OfSender = (new BigInteger(1, md.digest())).toString(16); - String md5OfReceiver = serviceClient.startReceiving(md5OfSender, filePathSplit, - null, 0); + String md5OfReceiver = serviceClient.receiveData(md5OfSender, filePathSplit, + null, SyncDataStatus.SUCCESS_STATUS); if (md5OfSender.equals(md5OfReceiver)) { LOGGER.info("receiver has received {} successfully.", snapshotFilePath); break; @@ -448,11 +450,11 @@ public class FileSenderImpl implements FileSender { ByteBuffer buffToSend = ByteBuffer.wrap(bos.toByteArray()); bos.reset(); // 1 represents there is still schema buffer to send. - serviceClient.getSchema(buffToSend, 1); + serviceClient.getSchema(buffToSend, SyncDataStatus.PROCESSING_STATUS); } bos.close(); // 0 represents the schema file has been transferred completely. - serviceClient.getSchema(null, 0); + serviceClient.getSchema(null, SyncDataStatus.SUCCESS_STATUS); } catch (Exception e) { LOGGER.error("cannot sync schema ", e); throw new SyncConnectionException(e); @@ -463,7 +465,7 @@ public class FileSenderImpl implements FileSender { public boolean afterSynchronization() throws SyncConnectionException { boolean successOrNot; try { - successOrNot = serviceClient.merge(); + successOrNot = serviceClient.load(); } catch (TException e) { throw new SyncConnectionException( "can not finish sync process because sync receiver has broken down.", e); diff --git a/iotdb/src/main/thrift/SyncServerService.thrift b/service-rpc/src/main/thrift/sync.thrift similarity index 80% rename from iotdb/src/main/thrift/SyncServerService.thrift rename to service-rpc/src/main/thrift/sync.thrift index d633d25..a947507 100755 --- a/iotdb/src/main/thrift/SyncServerService.thrift +++ b/service-rpc/src/main/thrift/sync.thrift @@ -16,23 +16,22 @@ * specific language governing permissions and limitations * under the License. */ -namespace java org.apache.iotdb.db.sync.receiver +namespace java org.apache.iotdb.service.sync.thrift typedef i32 int typedef i16 short typedef i64 long -enum SYNC_STATUS { +enum SyncDataStatus { SUCCESS_STATUS, - SYNC_STATUS - + PROCESSING_STATUS } -service ServerService{ +service SyncService{ bool getUUID(1:string uuid, 2:string address) - void getSchema(1:binary buff, 2:int status) - string receiveData(1:string md5, 2:list<string> filename, 3:binary buff, 4:int status) + void getSchema(1:binary buff, 2:SyncDataStatus status) + string receiveData(1:string md5, 2:list<string> filename, 3:binary buff, 4:SyncDataStatus status) bool load() - void afterReceiving() + void cleanUp() void init(1:string storageGroup) } \ No newline at end of file
