This is an automated email from the ASF dual-hosted git repository. lta pushed a commit to branch Sync-Reconstruct in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
commit 96f371022fb2bbe5cbf80f9f6e5df927344b19e5 Author: lta <[email protected]> AuthorDate: Sun Mar 17 14:36:03 2019 +0800 rename postback to sync and reconstruct sync client --- ...rt-postBackClient.bat => start-sync-client.bat} | 0 ...tart-postBackClient.sh => start-sync-client.sh} | 0 ...top-postBackClient.bat => stop-sync-client.bat} | 0 ...{stop-postBackClient.sh => stop-sync-client.sh} | 0 iotdb/iotdb/conf/iotdb-engine.properties | 2 +- ...ent.properties => iotdb-sync-client.properties} | 0 .../org/apache/iotdb/db/concurrent/ThreadName.java | 5 +- .../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 4 +- .../db/engine/filenode/FileNodeProcessor.java | 2 +- .../db/exception/SyncConnectionException.java} | 75 +-- .../iotdb/db/postback/sender/FileSenderImpl.java | 510 -------------------- .../java/org/apache/iotdb/db/service/IoTDB.java | 2 +- .../org/apache/iotdb/db/sync/conf/Constans.java} | 75 +-- .../conf/SyncSenderConfig.java} | 46 +- .../conf/SyncSenderDescriptor.java} | 77 +-- .../{postback => sync}/receiver/ServerManager.java | 26 +- .../receiver/ServerServiceImpl.java | 439 +++++++++--------- .../db/{postback => sync}/sender/FileManager.java | 87 ++-- .../db/{postback => sync}/sender/FileSender.java | 29 +- .../iotdb/db/sync/sender/FileSenderImpl.java | 516 +++++++++++++++++++++ .../utils/{PostbackUtils.java => SyncUtils.java} | 74 ++- ...rverService.thrift => SyncServerService.thrift} | 16 +- .../filenodev2/FileNodeManagerBenchmark.java | 2 +- .../{postback => sync}/sender/FileManagerTest.java | 21 +- .../sender/IoTDBSingleClientPostBackTest.java | 28 +- .../sender/MultipleClientPostBackTest.java | 2 +- .../iotdb/db/sync/test}/CreateDataSender1.java | 10 +- .../iotdb/db/sync/test}/CreateDataSender2.java | 8 +- .../iotdb/db/sync/test}/CreateDataSender3.java | 8 +- .../org/apache/iotdb/db/sync/test}/RandomNum.java | 2 +- .../java/org/apache/iotdb/db/sync/test}/Utils.java | 4 +- 31 files changed, 1078 insertions(+), 992 deletions(-) diff --git a/iotdb/iotdb/bin/start-postBackClient.bat b/iotdb/iotdb/bin/start-sync-client.bat similarity index 100% rename from iotdb/iotdb/bin/start-postBackClient.bat rename to iotdb/iotdb/bin/start-sync-client.bat diff --git a/iotdb/iotdb/bin/start-postBackClient.sh b/iotdb/iotdb/bin/start-sync-client.sh similarity index 100% rename from iotdb/iotdb/bin/start-postBackClient.sh rename to iotdb/iotdb/bin/start-sync-client.sh diff --git a/iotdb/iotdb/bin/stop-postBackClient.bat b/iotdb/iotdb/bin/stop-sync-client.bat similarity index 100% rename from iotdb/iotdb/bin/stop-postBackClient.bat rename to iotdb/iotdb/bin/stop-sync-client.bat diff --git a/iotdb/iotdb/bin/stop-postBackClient.sh b/iotdb/iotdb/bin/stop-sync-client.sh similarity index 100% rename from iotdb/iotdb/bin/stop-postBackClient.sh rename to iotdb/iotdb/bin/stop-sync-client.sh diff --git a/iotdb/iotdb/conf/iotdb-engine.properties b/iotdb/iotdb/conf/iotdb-engine.properties index fe6d154..cdca52c 100644 --- a/iotdb/iotdb/conf/iotdb-engine.properties +++ b/iotdb/iotdb/conf/iotdb-engine.properties @@ -211,7 +211,7 @@ postback_server_port=5555 # White IP list of Postback client. # Please use the form of network segment to present the range of IP, for example: 192.168.0.0/16 # If there are more than one IP segment, please separate them by commas -# The default is to allow all IP to postback +# The default is to allow all IP to sync IP_white_list=0.0.0.0/0 # Choose a postBack strategy of merging historical data: diff --git a/iotdb/iotdb/conf/iotdb-postbackClient.properties b/iotdb/iotdb/conf/iotdb-sync-client.properties similarity index 100% rename from iotdb/iotdb/conf/iotdb-postbackClient.properties rename to iotdb/iotdb/conf/iotdb-sync-client.properties diff --git a/iotdb/src/main/java/org/apache/iotdb/db/concurrent/ThreadName.java b/iotdb/src/main/java/org/apache/iotdb/db/concurrent/ThreadName.java index 04ca847..6053da6 100644 --- a/iotdb/src/main/java/org/apache/iotdb/db/concurrent/ThreadName.java +++ b/iotdb/src/main/java/org/apache/iotdb/db/concurrent/ThreadName.java @@ -34,7 +34,10 @@ public enum ThreadName { FLUSH_SERVICE("Flush-ServerServiceImpl"), WAL_DAEMON("IoTDB-MultiFileLogNodeManager-Sync-Thread"), WAL_FORCE_DAEMON("IoTDB-MultiFileLogNodeManager-Force-Thread"), - INDEX_SERVICE("Index-ServerServiceImpl"); + INDEX_SERVICE("Index-ServerServiceImpl"), + SYNC_CLIENT("Sync-Client"), + SYNC_SERVER("Sync-Server"), + SYNC_MONITOR("Sync-Monitor"); private String name; diff --git a/iotdb/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/iotdb/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java index 22f17ae..815e36b 100644 --- a/iotdb/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java +++ b/iotdb/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java @@ -253,11 +253,11 @@ public class IoTDBConfig { */ private int maxLogEntrySize = 4 * 1024 * 1024; /** - * Is this IoTDB instance a receiver of postback or not. + * Is this IoTDB instance a receiver of sync or not. */ private boolean isPostbackEnable = true; /** - * If this IoTDB instance is a receiver of postback, set the server port. + * If this IoTDB instance is a receiver of sync, set the server port. */ private int postbackServerPort = 5555; /* diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessor.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessor.java index 6f56ebc..3fddd8d 100644 --- a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessor.java +++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessor.java @@ -886,7 +886,7 @@ public class FileNodeProcessor extends Processor implements IStatistic { tsFileResource.getEndTime(entry.getKey()) >= entry.getValue() && tsFileResource.getStartTime(entry.getKey()) <= appendFile .getEndTime(entry.getKey())) { - String relativeFilePath = "postback" + File.separator + uuid + File.separator + "backup" + String relativeFilePath = "sync" + File.separator + uuid + File.separator + "backup" + File.separator + tsFileResource.getRelativePath(); File newFile = new File( Directories.getInstance().getTsFileFolder(tsFileResource.getBaseDirIndex()), diff --git a/iotdb/src/main/thrift/ServerService.thrift b/iotdb/src/main/java/org/apache/iotdb/db/exception/SyncConnectionException.java old mode 100755 new mode 100644 similarity index 61% copy from iotdb/src/main/thrift/ServerService.thrift copy to iotdb/src/main/java/org/apache/iotdb/db/exception/SyncConnectionException.java index cdcc47c..5b04e36 --- a/iotdb/src/main/thrift/ServerService.thrift +++ b/iotdb/src/main/java/org/apache/iotdb/db/exception/SyncConnectionException.java @@ -1,34 +1,41 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - namespace java org.apache.iotdb.db.postback.receiver - -typedef i32 int -typedef i16 short -typedef i64 long -service ServerService{ - bool getUUID(1:string uuid, 2:string address) - string startReceiving(1:string md5, 2:list<string> filename, 3:binary buff, 4:int status) - void getFileNodeInfo() - void mergeOldData(1:string path) - void mergeData() - void getSchema(1:binary buff, 2:int status) - bool merge() - void afterReceiving() - void init(1:string storageGroup) -} \ No newline at end of file +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.db.exception; + +/** + * @author Tianan Li + */ +public class SyncConnectionException extends Exception { + + + private static final long serialVersionUID = -6661904365503849681L; + + public SyncConnectionException(String message) { + super(message); + } + + public SyncConnectionException(String message, Throwable cause) { + super(message, cause); + } + + public SyncConnectionException(Throwable cause) { + super(cause); + } + +} diff --git a/iotdb/src/main/java/org/apache/iotdb/db/postback/sender/FileSenderImpl.java b/iotdb/src/main/java/org/apache/iotdb/db/postback/sender/FileSenderImpl.java deleted file mode 100644 index 0b38d38..0000000 --- a/iotdb/src/main/java/org/apache/iotdb/db/postback/sender/FileSenderImpl.java +++ /dev/null @@ -1,510 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.iotdb.db.postback.sender; - -import java.io.BufferedReader; -import java.io.ByteArrayOutputStream; -import java.io.File; -import java.io.FileInputStream; -import java.io.FileOutputStream; -import java.io.FileReader; -import java.io.IOException; -import java.math.BigInteger; -import java.net.InetAddress; -import java.net.ServerSocket; -import java.net.Socket; -import java.net.UnknownHostException; -import java.nio.ByteBuffer; -import java.nio.file.FileSystems; -import java.nio.file.Files; -import java.nio.file.Path; -import java.security.MessageDigest; -import java.util.ArrayList; -import java.util.Date; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; -import java.util.Set; -import java.util.UUID; -import org.apache.iotdb.db.postback.conf.PostBackSenderConfig; -import org.apache.iotdb.db.postback.conf.PostBackSenderDescriptor; -import org.apache.iotdb.db.postback.receiver.ServerService; -import org.apache.iotdb.db.utils.PostbackUtils; -import org.apache.thrift.TException; -import org.apache.thrift.protocol.TBinaryProtocol; -import org.apache.thrift.protocol.TProtocol; -import org.apache.thrift.transport.TSocket; -import org.apache.thrift.transport.TTransport; -import org.apache.thrift.transport.TTransportException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * The class is to transfer tsfiles that needs to postback to receiver. - * - * @author lta - */ -public class FileSenderImpl implements FileSender { - - private static final Logger LOGGER = LoggerFactory.getLogger(FileSenderImpl.class); - private TTransport transport; - private ServerService.Client clientOfServer; - private List<String> schema = new ArrayList<>(); - private String uuid;// Mark the identity of sender - /** - * Mark whether connection of sender and receiver has broken down or not. - */ - private boolean connectionOrElse; - private PostBackSenderConfig config = PostBackSenderDescriptor.getInstance().getConfig(); - private Date lastPostBackTime = new Date(); // Mark the start time of last postback - private boolean postBackStatus = false; // If true, postback is in execution. - private static final String POST_BACK_SENDER_POSTBACK_PROCESS_HAS_FAILED = "IoTDB post back sender : postback process has failed!"; - private Map<String, Set<String>> sendingFileSnapshotList = new HashMap<>(); - - private FileSenderImpl() { - } - - public static final FileSenderImpl getInstance() { - return TransferHolder.INSTANCE; - } - - /** - * Create a sender and send files to the receiver. - * - * @param args not used - */ - public static void main(String[] args) throws InterruptedException { - FileSenderImpl fileSenderImpl = new FileSenderImpl(); - fileSenderImpl.verifyPort(); - Thread monitor = new Thread(fileSenderImpl::monitorPostbackStatus); - monitor.start(); - fileSenderImpl.timedTask(); - } - - public void setConfig(PostBackSenderConfig config) { - this.config = config; - } - - private void getConnection(String serverIP, int serverPort) { - connectToReceiver(serverIP, serverPort); - if (connectionOrElse && !transferUUID(config.getUuidPath())) { - LOGGER.error( - "IoTDB post back sender: Sorry! You do not have the permission to " - + "connect to postback receiver!"); - connectionOrElse = false; - - } - } - - /** - * Establish a connection between the sender and the receiver. - * - * @param serverIp the ip address of the receiver - * @param serverPort must be same with port receiver set. - */ - @Override - public void connectToReceiver(String serverIp, int serverPort) { - transport = new TSocket(serverIp, serverPort); - TProtocol protocol = new TBinaryProtocol(transport); - clientOfServer = new ServerService.Client(protocol); - try { - transport.open(); - } catch (TTransportException e) { - LOGGER.error("IoTDB post back sender: cannot connect to server", e); - connectionOrElse = false; - } - } - - /** - * UUID marks the identity of sender for receiver. - */ - @Override - public boolean transferUUID(String uuidPath) { - File file = new File(uuidPath); - if (!file.getParentFile().exists()) { - file.getParentFile().mkdirs(); - } - if (!file.exists()) { - try (FileOutputStream out = new FileOutputStream(file)) { - if (!file.createNewFile()) { - LOGGER.error("IoTDB post back sender: cannot create file {}", - file.getAbsoluteFile()); - } - uuid = "PB" + UUID.randomUUID().toString().replaceAll("-", ""); - out.write(uuid.getBytes()); - } catch (Exception e) { - LOGGER.error("IoTDB post back sender: cannot write UUID to file", e); - connectionOrElse = false; - } - } else { - try (BufferedReader bf = new BufferedReader((new FileReader(uuidPath)))) { - uuid = bf.readLine(); - } catch (IOException e) { - LOGGER.error("IoTDB post back sender: cannot read UUID from file", e); - connectionOrElse = false; - } - } - boolean legalConnectionOrNot = true; - try { - legalConnectionOrNot = clientOfServer.getUUID(uuid, - InetAddress.getLocalHost().getHostAddress()); - } catch (TException e) { - LOGGER.error("IoTDB post back sender: cannot send UUID to receiver", e); - connectionOrElse = false; - } catch (UnknownHostException e) { - LOGGER.error("IoTDB post back sender: unable to get local host", e); - legalConnectionOrNot = false; - } - return legalConnectionOrNot; - } - - /** - * Create snapshots for those sending files. - */ - @Override - public Set<String> makeFileSnapshot(Set<String> sendingFileList) { - Set<String> sendingSnapshotFileList = new HashSet<>(); - try { - for (String filePath : sendingFileList) { - String snapshotFilePath = PostbackUtils.getSnapshotFilePath(filePath); - sendingSnapshotFileList.add(snapshotFilePath); - File newFile = new File(snapshotFilePath); - if (!newFile.getParentFile().exists()) { - newFile.getParentFile().mkdirs(); - } - Path link = FileSystems.getDefault().getPath(snapshotFilePath); - Path target = FileSystems.getDefault().getPath(filePath); - Files.createLink(link, target); - } - } catch (IOException e) { - LOGGER.error("IoTDB post back sender: can not make fileSnapshot", e); - } - return sendingSnapshotFileList; - } - - /** - * Transfer data of a storage group to receiver. - * - * @param fileSnapshotList list of sending snapshot files in a storage group. - */ - @Override - public void transferData(Set<String> fileSnapshotList) { - try { - int num = 0; - for (String snapshotFilePath : fileSnapshotList) { - num++; - File file = new File(snapshotFilePath); - List<String> filePathSplit = new ArrayList<>(); - String os = System.getProperty("os.name"); - if (os.toLowerCase().startsWith("windows")) { - String[] name = snapshotFilePath.split(File.separator + File.separator); - filePathSplit.add("data"); - filePathSplit.add(name[name.length - 2]); - filePathSplit.add(name[name.length - 1]); - } else { - String[] name = snapshotFilePath.split(File.separator); - filePathSplit.add("data"); - filePathSplit.add(name[name.length - 2]); - filePathSplit.add(name[name.length - 1]); - } - while (true) { - // Send all data to receiver - try (FileInputStream fis = new FileInputStream(file)) { - int mBufferSize = 64 * 1024 * 1024; - ByteArrayOutputStream bos = new ByteArrayOutputStream(mBufferSize); - byte[] buffer = new byte[mBufferSize]; - int n; - while ((n = fis.read(buffer)) != -1) { // cut the file into pieces to send - bos.write(buffer, 0, n); - ByteBuffer buffToSend = ByteBuffer.wrap(bos.toByteArray()); - bos.reset(); - clientOfServer.startReceiving(null, filePathSplit, buffToSend, 1); - } - bos.close(); - } - - // Get md5 of the file. - MessageDigest md = MessageDigest.getInstance("MD5"); - try (FileInputStream fis = new FileInputStream(file)) { - int mBufferSize = 8 * 1024 * 1024; - byte[] buffer = new byte[mBufferSize]; - int m; - while ((m = fis.read(buffer)) != -1) { - md.update(buffer, 0, m); - } - } - - // the file is sent successfully - String md5OfSender = (new BigInteger(1, md.digest())).toString(16); - String md5OfReceiver = clientOfServer.startReceiving(md5OfSender, filePathSplit, - null, 0); - if (md5OfSender.equals(md5OfReceiver)) { - LOGGER.info("IoTDB sender: receiver has received {} successfully.", snapshotFilePath); - break; - } - } - LOGGER.info(String - .format("IoTDB sender : Task of sending files to receiver has completed %d/%d.", num, - fileSnapshotList.size())); - } - } catch (TException e) { - LOGGER.error("IoTDB post back sender: cannot sending data because receiver has broken down."); - connectionOrElse = false; - } catch (Exception e) { - LOGGER.error("IoTDB post back sender: cannot sending data ", e); - connectionOrElse = false; - } - } - - /** - * Sending schema to receiver. - * - * @param schemaPath the path of the schema file. - */ - @Override - public void sendSchema(String schemaPath) { - try (FileInputStream fis = new FileInputStream(new File(schemaPath))) { - int mBufferSize = 4 * 1024 * 1024; - ByteArrayOutputStream bos = new ByteArrayOutputStream(mBufferSize); - byte[] buffer = new byte[mBufferSize]; - int n; - while ((n = fis.read(buffer)) != -1) { // cut the file into pieces to send - bos.write(buffer, 0, n); - ByteBuffer buffToSend = ByteBuffer.wrap(bos.toByteArray()); - bos.reset(); - // 1 represents there is still schema buffer to send. - clientOfServer.getSchema(buffToSend, 1); - } - bos.close(); - // 0 represents the schema file has been transferred completely. - clientOfServer.getSchema(null, 0); - } catch (Exception e) { - LOGGER.error("IoTDB post back sender : cannot send schema from mlog.txt ", e); - connectionOrElse = false; - } - } - - @Override - public boolean afterSending() { - boolean successOrNot = false; - try { - successOrNot = clientOfServer.merge(); - } catch (TException e) { - LOGGER.error( - "IoTDB post back sender : can not finish postback process because postback " - + "receiver has broken down."); - transport.close(); - } - return successOrNot; - } - - public List<String> getSchema() { - return schema; - } - - /** - * The method is to verify whether the client port is bind or not, ensuring that only one client - * is running. - */ - private void verifyPort() { - try { - Socket socket = new Socket("localhost", config.getClientPort()); - socket.close(); - LOGGER.error("The postback client has been started!"); - System.exit(0); - } catch (IOException e) { - try { - ServerSocket listenerSocket = new ServerSocket(config.getClientPort()); - Thread listener = new Thread(() -> { - while (true) { - try { - listenerSocket.accept(); - } catch (IOException e2) { - LOGGER.error("IoTDB post back sender: unable to listen to port{}", - config.getClientPort(), e2); - } - } - }); - listener.start(); - } catch (IOException e1) { - LOGGER.error("IoTDB post back sender: unable to listen to port{}", - config.getClientPort(), e1); - } - } - } - - /** - * Monitor postback status. - */ - private void monitorPostbackStatus() { - Date oldTime = new Date(); - while (true) { - Date currentTime = new Date(); - if (currentTime.getTime() / 1000 == oldTime.getTime() / 1000) { - continue; - } - if ((currentTime.getTime() - lastPostBackTime.getTime()) - % (config.getUploadCycleInSeconds() * 1000) == 0) { - oldTime = currentTime; - if (postBackStatus) { - LOGGER.info("IoTDB post back sender : postback process is in execution!"); - } - } - } - } - - /** - * Start postback task in a certain time. - */ - public void timedTask() throws InterruptedException { - postback(); - lastPostBackTime = new Date(); - Date currentTime; - while (true) { - Thread.sleep(2000); - currentTime = new Date(); - if (currentTime.getTime() - lastPostBackTime.getTime() - > config.getUploadCycleInSeconds() * 1000) { - lastPostBackTime = currentTime; - postback(); - } - } - } - - /** - * Execute a postback task. - */ - @Override - public void postback() { - - for (String snapshotPath : config.getSnapshotPaths()) { - if (new File(snapshotPath).exists() && new File(snapshotPath).list().length != 0) { - // it means that the last task of postback does not succeed! Clear the files and - // start to postback again - try { - PostbackUtils.deleteFile(new File(snapshotPath)); - } catch (IOException e) { - LOGGER.error("can not delete file {}", snapshotPath, e); - } - } - } - - postBackStatus = true; - connectionOrElse = true; - - // connect to postback server - getConnection(config.getServerIp(), config.getServerPort()); - if (!connectionOrElse) { - LOGGER.info(POST_BACK_SENDER_POSTBACK_PROCESS_HAS_FAILED); - postBackStatus = false; - return; - } - - FileManager fileManager = FileManager.getInstance(); - fileManager.init(); - Map<String, Set<String>> sendingFileList = fileManager.getSendingFiles(); - Map<String, Set<String>> nowLocalFileList = fileManager.getCurrentLocalFiles(); - if (PostbackUtils.isEmpty(sendingFileList)) { - LOGGER.info("IoTDB post back sender : there has no file to postback !"); - postBackStatus = false; - return; - } - - // create snapshot - for (Entry<String, Set<String>> entry : sendingFileList.entrySet()) { - sendingFileSnapshotList.put(entry.getKey(), makeFileSnapshot(entry.getValue())); - } - - sendSchema(config.getSchemaPath()); - if (!connectionOrElse) { - transport.close(); - LOGGER.info(POST_BACK_SENDER_POSTBACK_PROCESS_HAS_FAILED); - postBackStatus = false; - return; - } - for (Entry<String, Set<String>> entry : sendingFileList.entrySet()) { - Set<String> sendingList = entry.getValue(); - Set<String> sendingSnapshotList = sendingFileSnapshotList.get(entry.getKey()); - if (sendingSnapshotList.isEmpty()) { - continue; - } - LOGGER.info("IoTDB post back sender : postback process starts to transfer data of " - + "storage group {}.", entry.getKey()); - try { - clientOfServer.init(entry.getKey()); - } catch (TException e) { - connectionOrElse = false; - LOGGER.error("IoTDB post back sender : unable to connect to receiver", e); - } - if (!connectionOrElse) { - transport.close(); - LOGGER.info(POST_BACK_SENDER_POSTBACK_PROCESS_HAS_FAILED); - postBackStatus = false; - return; - } - transferData(sendingSnapshotList); - if (!connectionOrElse) { - transport.close(); - LOGGER.info(POST_BACK_SENDER_POSTBACK_PROCESS_HAS_FAILED); - postBackStatus = false; - return; - } - if (afterSending()) { - nowLocalFileList.get(entry.getKey()).addAll(sendingList); - fileManager.setCurrentLocalFiles(nowLocalFileList); - fileManager.backupNowLocalFileInfo(config.getLastFileInfo()); - LOGGER.info("IoTDB post back sender : the postBack has finished storage group {}.", - entry.getKey()); - } else { - LOGGER.info(POST_BACK_SENDER_POSTBACK_PROCESS_HAS_FAILED); - postBackStatus = false; - return; - } - } - for (String snapshotPath : config.getSnapshotPaths()) { - try { - PostbackUtils.deleteFile(new File(snapshotPath)); - } catch (IOException e) { - LOGGER.error(" ", e); - } - } - try { - clientOfServer.afterReceiving(); - } catch (TException e) { - connectionOrElse = false; - LOGGER.error("IoTDB post back sender : unable to connect to receiver ", e); - } - if (!connectionOrElse) { - transport.close(); - LOGGER.info(POST_BACK_SENDER_POSTBACK_PROCESS_HAS_FAILED); - postBackStatus = false; - return; - } - transport.close(); - LOGGER.info("IoTDB post back sender : postback process has finished!"); - postBackStatus = false; - } - - private static class TransferHolder { - - private static final FileSenderImpl INSTANCE = new FileSenderImpl(); - } -} diff --git a/iotdb/src/main/java/org/apache/iotdb/db/service/IoTDB.java b/iotdb/src/main/java/org/apache/iotdb/db/service/IoTDB.java index 9689166..fead066 100644 --- a/iotdb/src/main/java/org/apache/iotdb/db/service/IoTDB.java +++ b/iotdb/src/main/java/org/apache/iotdb/db/service/IoTDB.java @@ -33,7 +33,7 @@ import org.apache.iotdb.db.exception.StartupException; import org.apache.iotdb.db.exception.builder.ExceptionBuilder; import org.apache.iotdb.db.metadata.MManager; import org.apache.iotdb.db.monitor.StatMonitor; -import org.apache.iotdb.db.postback.receiver.ServerManager; +import org.apache.iotdb.db.sync.receiver.ServerManager; import org.apache.iotdb.db.query.control.FileReaderManager; import org.apache.iotdb.db.writelog.manager.MultiFileLogNodeManager; import org.apache.iotdb.db.writelog.manager.WriteLogNodeManager; diff --git a/iotdb/src/main/thrift/ServerService.thrift b/iotdb/src/main/java/org/apache/iotdb/db/sync/conf/Constans.java old mode 100755 new mode 100644 similarity index 58% copy from iotdb/src/main/thrift/ServerService.thrift copy to iotdb/src/main/java/org/apache/iotdb/db/sync/conf/Constans.java index cdcc47c..31e0ad4 --- a/iotdb/src/main/thrift/ServerService.thrift +++ b/iotdb/src/main/java/org/apache/iotdb/db/sync/conf/Constans.java @@ -1,34 +1,41 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - namespace java org.apache.iotdb.db.postback.receiver - -typedef i32 int -typedef i16 short -typedef i64 long -service ServerService{ - bool getUUID(1:string uuid, 2:string address) - string startReceiving(1:string md5, 2:list<string> filename, 3:binary buff, 4:int status) - void getFileNodeInfo() - void mergeOldData(1:string path) - void mergeData() - void getSchema(1:binary buff, 2:int status) - bool merge() - void afterReceiving() - void init(1:string storageGroup) -} \ No newline at end of file +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.db.sync.conf; + +/** + * @author Tianan Li + */ +public class Constans { + + private Constans() { + } + + public static final String CONFIG_NAME = "iotdb-sync-client.properties"; + public static final String SYNC = "sync"; + + public static final String UUID_FILE_NAME = "uuid.txt"; + public static final String LAST_LOCAL_FILE_NAME = "last_local_files.txt"; + public static final String DATA_SNAPSHOT_NAME = "data_snapshot"; + + /** + * Split data file , block size at each transmission + **/ + public static final int DATA_CHUNK_SIZE = 64 * 1024 * 1024; + +} diff --git a/iotdb/src/main/java/org/apache/iotdb/db/postback/conf/PostBackSenderConfig.java b/iotdb/src/main/java/org/apache/iotdb/db/sync/conf/SyncSenderConfig.java similarity index 64% rename from iotdb/src/main/java/org/apache/iotdb/db/postback/conf/PostBackSenderConfig.java rename to iotdb/src/main/java/org/apache/iotdb/db/sync/conf/SyncSenderConfig.java index b0d1894..0def77c 100644 --- a/iotdb/src/main/java/org/apache/iotdb/db/postback/conf/PostBackSenderConfig.java +++ b/iotdb/src/main/java/org/apache/iotdb/db/sync/conf/SyncSenderConfig.java @@ -16,35 +16,59 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.iotdb.db.postback.conf; +package org.apache.iotdb.db.sync.conf; import java.io.File; import org.apache.iotdb.db.conf.IoTDBDescriptor; +import org.apache.iotdb.db.metadata.MetadataConstant; /** - * @author lta + * @author Tianan Li */ -public class PostBackSenderConfig { - - public static final String CONFIG_NAME = "iotdb-postbackClient.properties"; +public class SyncSenderConfig { private String[] iotdbBufferwriteDirectory = IoTDBDescriptor.getInstance().getConfig() .getBufferWriteDirs(); - private String dataDirectory = - new File(IoTDBDescriptor.getInstance().getConfig().getDataDir()).getAbsolutePath() - + File.separator; + private String dataDirectory = IoTDBDescriptor.getInstance().getConfig().getDataDir(); private String uuidPath; private String lastFileInfo; private String[] snapshotPaths; - private String schemaPath = - new File(IoTDBDescriptor.getInstance().getConfig().getMetadataDir()).getAbsolutePath() - + File.separator + "mlog.txt"; + private String schemaPath; private String serverIp = "127.0.0.1"; private int serverPort = 5555; private int clientPort = 6666; private int uploadCycleInSeconds = 10; private boolean clearEnable = false; + public void init() { + String metadataDirPath = IoTDBDescriptor.getInstance().getConfig().getMetadataDir(); + if (metadataDirPath.length() > 0 + && metadataDirPath.charAt(metadataDirPath.length() - 1) != File.separatorChar) { + metadataDirPath = metadataDirPath + File.separatorChar; + } + schemaPath = metadataDirPath + MetadataConstant.METADATA_LOG; + if (dataDirectory.length() > 0 + && dataDirectory.charAt(dataDirectory.length() - 1) != File.separatorChar) { + dataDirectory += File.separatorChar; + } + uuidPath = dataDirectory + Constans.SYNC + File.separatorChar + Constans.UUID_FILE_NAME; + lastFileInfo = + dataDirectory + Constans.SYNC + File.separatorChar + Constans.LAST_LOCAL_FILE_NAME; + snapshotPaths = new String[iotdbBufferwriteDirectory.length]; + for (int i = 0; i < iotdbBufferwriteDirectory.length; i++) { + iotdbBufferwriteDirectory[i] = new File(iotdbBufferwriteDirectory[i]).getAbsolutePath(); + if (iotdbBufferwriteDirectory[i].length() > 0 + && iotdbBufferwriteDirectory[i].charAt(iotdbBufferwriteDirectory[i].length() - 1) + != File.separatorChar) { + iotdbBufferwriteDirectory[i] = iotdbBufferwriteDirectory[i] + File.separatorChar; + } + snapshotPaths[i] = iotdbBufferwriteDirectory[i] + Constans.SYNC + File.separatorChar + + Constans.DATA_SNAPSHOT_NAME + + File.separatorChar; + } + + } + public String[] getIotdbBufferwriteDirectory() { return iotdbBufferwriteDirectory; } diff --git a/iotdb/src/main/java/org/apache/iotdb/db/postback/conf/PostBackSenderDescriptor.java b/iotdb/src/main/java/org/apache/iotdb/db/sync/conf/SyncSenderDescriptor.java similarity index 59% rename from iotdb/src/main/java/org/apache/iotdb/db/postback/conf/PostBackSenderDescriptor.java rename to iotdb/src/main/java/org/apache/iotdb/db/sync/conf/SyncSenderDescriptor.java index d4f5983..04a5e7d 100644 --- a/iotdb/src/main/java/org/apache/iotdb/db/postback/conf/PostBackSenderDescriptor.java +++ b/iotdb/src/main/java/org/apache/iotdb/db/sync/conf/SyncSenderDescriptor.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.iotdb.db.postback.conf; +package org.apache.iotdb.db.sync.conf; import java.io.File; import java.io.FileInputStream; @@ -29,53 +29,61 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * @author lta + * @author Tianan Li */ -public class PostBackSenderDescriptor { +public class SyncSenderDescriptor { - private static final Logger LOGGER = LoggerFactory.getLogger(PostBackSenderDescriptor.class); - private static final String POSTBACK = "postback"; - private PostBackSenderConfig conf = new PostBackSenderConfig(); + private static final Logger LOGGER = LoggerFactory.getLogger(SyncSenderDescriptor.class); + private SyncSenderConfig conf = new SyncSenderConfig(); - private PostBackSenderDescriptor() { + private SyncSenderDescriptor() { loadProps(); } - public static final PostBackSenderDescriptor getInstance() { + public static final SyncSenderDescriptor getInstance() { return PostBackDescriptorHolder.INSTANCE; } - public PostBackSenderConfig getConfig() { + public SyncSenderConfig getConfig() { return conf; } - public void setConfig(PostBackSenderConfig conf) { + public void setConfig(SyncSenderConfig conf) { this.conf = conf; } /** - * load an properties file and set TsfileDBConfig variables + * load an properties file and set sync config variables */ private void loadProps() { + conf.init(); + InputStream inputStream; String url = System.getProperty(IoTDBConstant.IOTDB_CONF, null); if (url == null) { url = System.getProperty(IoTDBConstant.IOTDB_HOME, null); if (url != null) { url = url + File.separatorChar + "conf" + File.separatorChar - + PostBackSenderConfig.CONFIG_NAME; + + Constans.CONFIG_NAME; } else { LOGGER.warn( "Cannot find IOTDB_HOME or IOTDB_CONF environment variable when loading config file {}, use default configuration", - PostBackSenderConfig.CONFIG_NAME); + Constans.CONFIG_NAME); return; } } else { - url += (File.separatorChar + PostBackSenderConfig.CONFIG_NAME); + url += (File.separatorChar + Constans.CONFIG_NAME); } - LOGGER.info("Start to read config file {}", url); + try { + inputStream = new FileInputStream(new File(url)); + } catch (FileNotFoundException e) { + LOGGER.warn("Fail to find sync config file {}", url, e); + return; + } + + LOGGER.info("Start to read sync config file {}", url); Properties properties = new Properties(); - try (InputStream inputStream = new FileInputStream(new File(url))) { + try { properties.load(inputStream); conf.setServerIp(properties.getProperty("server_ip", conf.getServerIp())); @@ -88,21 +96,28 @@ public class PostBackSenderDescriptor { .getProperty("upload_cycle_in_seconds", Integer.toString(conf.getUploadCycleInSeconds())))); conf.setSchemaPath(properties.getProperty("iotdb_schema_directory", conf.getSchemaPath())); - conf.setClearEnable(Boolean - .parseBoolean( - properties.getProperty("is_clear_enable", Boolean.toString(conf.getClearEnable())))); - conf.setUuidPath(conf.getDataDirectory() + POSTBACK + File.separator + "uuid.txt"); + conf.setDataDirectory( + properties.getProperty("iotdb_bufferWrite_directory", conf.getDataDirectory())); + String dataDirectory = conf.getDataDirectory(); + if (dataDirectory.length() > 0 + && dataDirectory.charAt(dataDirectory.length() - 1) != File.separatorChar) { + dataDirectory += File.separatorChar; + } + conf.setUuidPath( + dataDirectory + Constans.SYNC + File.separatorChar + Constans.UUID_FILE_NAME); conf.setLastFileInfo( - conf.getDataDirectory() + POSTBACK + File.separator + "lastLocalFileList.txt"); + dataDirectory + Constans.SYNC + File.separatorChar + Constans.LAST_LOCAL_FILE_NAME); String[] iotdbBufferwriteDirectory = conf.getIotdbBufferwriteDirectory(); String[] snapshots = new String[conf.getIotdbBufferwriteDirectory().length]; for (int i = 0; i < conf.getIotdbBufferwriteDirectory().length; i++) { - iotdbBufferwriteDirectory[i] = new File(iotdbBufferwriteDirectory[i]).getAbsolutePath(); - if (!iotdbBufferwriteDirectory[i].endsWith(File.separator)) { - iotdbBufferwriteDirectory[i] = iotdbBufferwriteDirectory[i] + File.separator; + if (iotdbBufferwriteDirectory[i].length() > 0 + && iotdbBufferwriteDirectory[i].charAt(iotdbBufferwriteDirectory[i].length() - 1) + != File.separatorChar) { + iotdbBufferwriteDirectory[i] = iotdbBufferwriteDirectory[i] + File.separatorChar; } - snapshots[i] = iotdbBufferwriteDirectory[i] + POSTBACK + File.separator + "dataSnapshot" - + File.separator; + snapshots[i] = iotdbBufferwriteDirectory[i] + Constans.SYNC + File.separatorChar + + Constans.DATA_SNAPSHOT_NAME + + File.separatorChar; } conf.setIotdbBufferwriteDirectory(iotdbBufferwriteDirectory); conf.setSnapshotPaths(snapshots); @@ -110,11 +125,19 @@ public class PostBackSenderDescriptor { LOGGER.warn("Cannot load config file because {}, use default configuration", e); } catch (Exception e) { LOGGER.warn("Error format in config file because {}, use default configuration", e); + } finally { + if (inputStream != null) { + try { + inputStream.close(); + } catch (IOException e) { + LOGGER.error("Fail to close sync config file input stream because ", e); + } + } } } private static class PostBackDescriptorHolder { - private static final PostBackSenderDescriptor INSTANCE = new PostBackSenderDescriptor(); + private static final SyncSenderDescriptor INSTANCE = new SyncSenderDescriptor(); } } \ No newline at end of file diff --git a/iotdb/src/main/java/org/apache/iotdb/db/postback/receiver/ServerManager.java b/iotdb/src/main/java/org/apache/iotdb/db/sync/receiver/ServerManager.java similarity index 81% rename from iotdb/src/main/java/org/apache/iotdb/db/postback/receiver/ServerManager.java rename to iotdb/src/main/java/org/apache/iotdb/db/sync/receiver/ServerManager.java index 0a82d71..a8baf2f 100644 --- a/iotdb/src/main/java/org/apache/iotdb/db/postback/receiver/ServerManager.java +++ b/iotdb/src/main/java/org/apache/iotdb/db/sync/receiver/ServerManager.java @@ -16,8 +16,9 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.iotdb.db.postback.receiver; +package org.apache.iotdb.db.sync.receiver; +import org.apache.iotdb.db.concurrent.ThreadName; import org.apache.iotdb.db.conf.IoTDBConfig; import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.exception.StartupException; @@ -32,9 +33,9 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * receiver server. + * sync receiver server. * - * @author lta + * @author Tianan Li */ public class ServerManager { @@ -51,7 +52,7 @@ public class ServerManager { } /** - * start postback receiver's server. + * start sync receiver's server. */ public void startServer() throws StartupException { Factory protocolFactory; @@ -63,8 +64,7 @@ public class ServerManager { try { if (conf.getIpWhiteList() == null) { LOGGER.error( - "IoTDB post back receiver: Postback server failed to start because IP white " - + "list is null, please set IP white list!"); + "Sync server failed to start because IP white list is null, please set IP white list."); return; } conf.setIpWhiteList(conf.getIpWhiteList().replaceAll(" ", "")); @@ -75,23 +75,23 @@ public class ServerManager { poolArgs.processor(processor); poolArgs.protocolFactory(protocolFactory); poolServer = new TThreadPoolServer(poolArgs); - LOGGER.info("Postback server has started."); - Runnable runnable = () -> poolServer.serve(); - Thread thread = new Thread(runnable); - thread.start(); + LOGGER.info("Sync server has started."); + Runnable syncServerRunnable = () -> poolServer.serve(); + Thread syncServerThread = new Thread(syncServerRunnable, ThreadName.SYNC_SERVER.getName()); + syncServerThread.start(); } catch (TTransportException e) { - throw new StartupException("IoTDB post back receiver: cannot start postback server.", e); + throw new StartupException("cannot start sync server.", e); } } /** - * close postback receiver's server. + * close sync receiver's server. */ public void closeServer() { if (conf.isPostbackEnable() && poolServer != null) { poolServer.stop(); serverTransport.close(); - LOGGER.info("Stop postback server."); + LOGGER.info("stop sync server."); } } diff --git a/iotdb/src/main/java/org/apache/iotdb/db/postback/receiver/ServerServiceImpl.java b/iotdb/src/main/java/org/apache/iotdb/db/sync/receiver/ServerServiceImpl.java similarity index 65% rename from iotdb/src/main/java/org/apache/iotdb/db/postback/receiver/ServerServiceImpl.java rename to iotdb/src/main/java/org/apache/iotdb/db/sync/receiver/ServerServiceImpl.java index 7bbdb16..de2b2a3 100644 --- a/iotdb/src/main/java/org/apache/iotdb/db/postback/receiver/ServerServiceImpl.java +++ b/iotdb/src/main/java/org/apache/iotdb/db/sync/receiver/ServerServiceImpl.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.iotdb.db.postback.receiver; +package org.apache.iotdb.db.sync.receiver; import java.io.BufferedReader; import java.io.File; @@ -28,26 +28,36 @@ import java.math.BigInteger; import java.nio.ByteBuffer; import java.nio.channels.FileChannel; import java.security.MessageDigest; -import java.sql.Connection; -import java.sql.DriverManager; -import java.sql.SQLException; -import java.sql.Statement; -import java.util.*; - +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; import org.apache.iotdb.db.conf.IoTDBConfig; -import org.apache.iotdb.db.conf.IoTDBConstant; import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.conf.directories.Directories; import org.apache.iotdb.db.engine.filenode.FileNodeManager; -import org.apache.iotdb.db.engine.filenode.TsFileResource; import org.apache.iotdb.db.engine.filenode.OverflowChangeType; +import org.apache.iotdb.db.engine.filenode.TsFileResource; import org.apache.iotdb.db.exception.FileNodeManagerException; -import org.apache.iotdb.db.utils.PostbackUtils; +import org.apache.iotdb.db.exception.MetadataArgsErrorException; +import org.apache.iotdb.db.exception.PathErrorException; +import org.apache.iotdb.db.exception.ProcessorException; +import org.apache.iotdb.db.metadata.MManager; +import org.apache.iotdb.db.metadata.MetadataOperationType; +import org.apache.iotdb.db.qp.executor.OverflowQPExecutor; +import org.apache.iotdb.db.qp.physical.crud.InsertPlan; +import org.apache.iotdb.db.utils.SyncUtils; import org.apache.iotdb.tsfile.file.metadata.ChunkGroupMetaData; import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData; import org.apache.iotdb.tsfile.file.metadata.TsDeviceMetadata; import org.apache.iotdb.tsfile.file.metadata.TsDeviceMetadataIndex; +import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; +import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding; import org.apache.iotdb.tsfile.read.ReadOnlyTsFile; import org.apache.iotdb.tsfile.read.TsFileSequenceReader; import org.apache.iotdb.tsfile.read.common.Field; @@ -60,14 +70,14 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * @author lta + * @author Tianan Li */ public class ServerServiceImpl implements ServerService.Iface { private static final Logger logger = LoggerFactory.getLogger(ServerServiceImpl.class); private static final FileNodeManager fileNodeManager = FileNodeManager.getInstance(); - private static final String JDBC_DRIVER_NAME = "org.apache.iotdb.jdbc.IoTDBDriver"; - private static final String POSTBACK = "postback"; + private static final MManager metadataManger = MManager.getInstance(); + private static final String POSTBACK = "sync"; private ThreadLocal<String> uuid = new ThreadLocal<>(); // String means Storage Group,List means the set of new Files(AbsulutePath) in local IoTDB // String means AbsulutePath of new Files @@ -81,7 +91,8 @@ public class ServerServiceImpl implements ServerService.Iface { private IoTDBConfig tsfileDBconfig = IoTDBDescriptor.getInstance().getConfig(); private String postbackPath; // Absolute seriesPath of IoTDB data directory - private String dataPath = new File(tsfileDBconfig.getDataDir()).getAbsolutePath() + File.separator; + private String dataPath = + new File(tsfileDBconfig.getDataDir()).getAbsolutePath() + File.separator; // Absolute paths of IoTDB bufferWrite directory private String[] bufferWritePaths = tsfileDBconfig.getBufferWriteDirs(); private IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig(); @@ -91,9 +102,11 @@ public class ServerServiceImpl implements ServerService.Iface { */ @Override public void init(String storageGroup) { - logger.info(String.format( - "IoTDB post back receiver: postback process starts to receive data of storage group {}."), - storageGroup); + if (logger.isInfoEnabled()) { + logger.info( + "IoTDB post back receiver: sync process starts to receive data of storage group {}", + storageGroup); + } fileNum.set(0); fileNodeMap.set(new HashMap<>()); fileNodeStartTime.set(new HashMap<>()); @@ -111,7 +124,7 @@ public class ServerServiceImpl implements ServerService.Iface { if (new File(postbackPath + this.uuid.get()).exists() && new File(postbackPath + this.uuid.get()).list().length != 0) { try { - PostbackUtils.deleteFile(new File(postbackPath + this.uuid.get())); + SyncUtils.deleteFile(new File(postbackPath + this.uuid.get())); } catch (IOException e) { throw new TException(e); } @@ -120,16 +133,16 @@ public class ServerServiceImpl implements ServerService.Iface { String backupPath = bufferWritePath + POSTBACK + File.separator; if (new File(backupPath + this.uuid.get()).exists() && new File(backupPath + this.uuid.get()).list().length != 0) { - // if does not exist, it means that the last time postback failed, clear uuid + // if does not exist, it means that the last time sync failed, clear uuid // data and receive the data again try { - PostbackUtils.deleteFile(new File(backupPath + this.uuid.get())); + SyncUtils.deleteFile(new File(backupPath + this.uuid.get())); } catch (IOException e) { throw new TException(e); } } } - return PostbackUtils.verifyIPSegment(config.getIpWhiteList(), ipAddress); + return SyncUtils.verifyIPSegment(config.getIpWhiteList(), ipAddress); } /** @@ -143,12 +156,12 @@ public class ServerServiceImpl implements ServerService.Iface { ByteBuffer dataToReceive, int status) throws TException { String md5OfReceiver = ""; StringBuilder filePathBuilder = new StringBuilder(); - FileChannel channel = null; + FileChannel channel; for (int i = 0; i < filePathSplit.size(); i++) { if (i == filePathSplit.size() - 1) { - filePathBuilder = filePathBuilder.append(filePathSplit.get(i)); + filePathBuilder.append(filePathSplit.get(i)); } else { - filePathBuilder = filePathBuilder.append(filePathSplit.get(i)).append(File.separator); + filePathBuilder.append(filePathSplit.get(i)).append(File.separator); } } String filePath = filePathBuilder.toString(); @@ -184,11 +197,16 @@ public class ServerServiceImpl implements ServerService.Iface { md5OfReceiver = (new BigInteger(1, md.digest())).toString(16); if (md5OfSender.equals(md5OfReceiver)) { fileNum.set(fileNum.get() + 1); - logger.info(String - .format("IoTDB post back receiver : Receiver has received %d files from sender!", - fileNum.get())); + if (logger.isInfoEnabled()) { + logger.info(String + .format("IoTDB post back receiver : Receiver has received %d files from sender", + fileNum.get())); + } } else { - new File(filePath).delete(); + if (!new File(filePath).delete()) { + logger.error("IoTDB post back receiver : Receiver can not delete file {}", + new File(filePath).getAbsolutePath()); + } } } catch (Exception e) { logger.error("IoTDB post back receiver: cannot generate md5", e); @@ -205,58 +223,9 @@ public class ServerServiceImpl implements ServerService.Iface { */ @Override public void getSchema(ByteBuffer schema, int status) { - FileOutputStream fos; - FileChannel channel; if (status == 0) { - Statement statement = null; - try (Connection connection = DriverManager.getConnection("jdbc:iotdb://localhost:" + - config.getRpcPort() + "/", IoTDBConstant.ADMIN_NAME, IoTDBConstant.ADMIN_PW)) { - Class.forName(JDBC_DRIVER_NAME); - statement = connection.createStatement(); - - try (BufferedReader bf = new BufferedReader( - new java.io.FileReader(schemaFromSenderPath.get()))) { - String data; - statement.clearBatch(); - int count = 0; - while ((data = bf.readLine()) != null) { - String[] item = data.split(","); - if (item[0].equals("2")) { - String sql = String.format("SET STORAGE GROUP TO %s", item[1]); - statement.addBatch(sql); - } else if (item[0].equals("0")) { - String sql = String - .format("CREATE TIMESERIES %s WITH DATATYPE=%s, ENCODING=%s", item[1], item[2], - item[3]); - statement.addBatch(sql); - } - count++; - if (count > 10000) { - statement.executeBatch(); - statement.clearBatch(); - count = 0; - } - } - } catch (FileNotFoundException e) { - logger.error("IoTDB post back receiver: cannot read the file {}.", - schemaFromSenderPath.get(), e); - } catch (IOException e) { - logger.error("IoTDB post back receiver: cannot insert schema to IoTDB.", e); - } - - statement.executeBatch(); - statement.clearBatch(); - } catch (SQLException | ClassNotFoundException e) { - logger.error("IoTDB post back receiver: jdbc can not connect to IoTDB.", e); - } finally { - try { - if (statement != null) { - statement.close(); - } - } catch (SQLException e) { - logger.error("IoTDB post back receiver : can not close JDBC connection.", e); - } - } + /** sync metadata, include storage group and timeseries **/ + syncMetadata(); } else { File file = new File(schemaFromSenderPath.get()); if (!file.getParentFile().exists()) { @@ -270,18 +239,84 @@ public class ServerServiceImpl implements ServerService.Iface { logger.error("IoTDB post back receiver: cannot make schema file.", e); } } - try { - fos = new FileOutputStream(file, true); - channel = fos.getChannel(); + try (FileOutputStream fos = new FileOutputStream(file, true); + FileChannel channel = fos.getChannel()) { channel.write(schema); - channel.close(); - fos.close(); } catch (Exception e) { - logger.error("IoTDB post back receiver: cannot write data to file.", - e); + logger.error("IoTDB post back receiver: cannot write data to file.", e); + } + } + + } + + /** + * Sync metadata with sender + */ + private void syncMetadata() { + if (new File(schemaFromSenderPath.get()).exists()) { + try (BufferedReader br = new BufferedReader( + new java.io.FileReader(schemaFromSenderPath.get()))) { + String metadataOperation; + while ((metadataOperation = br.readLine()) != null) { + operation(metadataOperation); + } + } catch (FileNotFoundException e) { + logger.error("IoTDB post back receiver: cannot read the file {}.", + schemaFromSenderPath.get(), e); + } catch (IOException e) { + logger.error("IoTDB post back receiver: cannot insert schema to IoTDB.", e); + } catch (Exception e) { + logger.error("IoTDB post back receiver: parse metadata operation failed.", e); } } + } + /** + * Operate metadata operation in MManager + * + * @param cmd metadata operation + */ + private void operation(String cmd) + throws PathErrorException, IOException, MetadataArgsErrorException { + String[] args = cmd.trim().split(","); + switch (args[0]) { + case MetadataOperationType.ADD_PATH_TO_MTREE: + Map<String, String> props = null; + String[] kv; + props = new HashMap<>(args.length - 5 + 1, 1); + for (int k = 5; k < args.length; k++) { + kv = args[k].split("="); + props.put(kv[0], kv[1]); + } + metadataManger.addPathToMTree(args[1], TSDataType.deserialize(Short.valueOf(args[2])), + TSEncoding.deserialize(Short.valueOf(args[3])), + CompressionType.deserialize(Short.valueOf(args[4])), + props); + break; + case MetadataOperationType.DELETE_PATH_FROM_MTREE: + metadataManger.deletePathFromMTree(args[1]); + break; + case MetadataOperationType.SET_STORAGE_LEVEL_TO_MTREE: + metadataManger.setStorageLevelToMTree(args[1]); + break; + case MetadataOperationType.ADD_A_PTREE: + metadataManger.addAPTree(args[1]); + break; + case MetadataOperationType.ADD_A_PATH_TO_PTREE: + metadataManger.addPathToPTree(args[1]); + break; + case MetadataOperationType.DELETE_PATH_FROM_PTREE: + metadataManger.deletePathFromPTree(args[1]); + break; + case MetadataOperationType.LINK_MNODE_TO_PTREE: + metadataManger.linkMNodeToPTree(args[1], args[2]); + break; + case MetadataOperationType.UNLINK_MNODE_FROM_PTREE: + metadataManger.unlinkMNodeFromPTree(args[1], args[2]); + break; + default: + logger.error("Unrecognizable command {}", cmd); + } } @Override @@ -289,7 +324,7 @@ public class ServerServiceImpl implements ServerService.Iface { getFileNodeInfo(); mergeData(); try { - PostbackUtils.deleteFile(new File(postbackPath + this.uuid.get())); + SyncUtils.deleteFile(new File(postbackPath + this.uuid.get())); } catch (IOException e) { throw new TException(e); } @@ -297,10 +332,10 @@ public class ServerServiceImpl implements ServerService.Iface { String backupPath = bufferWritePath + POSTBACK + File.separator; if (new File(backupPath + this.uuid.get()).exists() && new File(backupPath + this.uuid.get()).list().length != 0) { - // if does not exist, it means that the last time postback process failed, clear + // if does not exist, it means that the last time sync process failed, clear // uuid data and receive the data again try { - PostbackUtils.deleteFile(new File(backupPath + this.uuid.get())); + SyncUtils.deleteFile(new File(backupPath + this.uuid.get())); } catch (IOException e) { throw new TException(e); } @@ -366,11 +401,13 @@ public class ServerServiceImpl implements ServerService.Iface { fileNodeEndTime.get().put(fileTF.getAbsolutePath(), endTimeMap); filesPath.add(fileTF.getAbsolutePath()); num++; - logger.info(String - .format("IoTDB receiver : Getting FileNode Info has complete : %d/%d", num, - fileNum.get())); + if (logger.isInfoEnabled()) { + logger.info(String + .format("IoTDB receiver : Getting FileNode Info has complete : %d/%d", num, + fileNum.get())); + } + fileNodeMap.get().put(storageGroupPB.getName(), filesPath); } - fileNodeMap.get().put(storageGroupPB.getName(), filesPath); } } @@ -379,78 +416,67 @@ public class ServerServiceImpl implements ServerService.Iface { */ @Override public void mergeOldData(String filePath) throws TException { - Set<String> timeseries = new HashSet<>(); - Statement statement = null; + Set<String> timeseriesSet = new HashSet<>(); TsFileSequenceReader reader = null; - try (Connection connection = DriverManager.getConnection( - String.format("jdbc:iotdb://localhost:%d/", config.getRpcPort()), "root", - "root")) { - Class.forName(JDBC_DRIVER_NAME); - statement = connection.createStatement(); - int count = 0; - + OverflowQPExecutor insertExecutor = new OverflowQPExecutor(); + try { + /** use tsfile reader to get data **/ reader = new TsFileSequenceReader(filePath); Map<String, TsDeviceMetadataIndex> deviceIdMap = reader.readFileMetadata().getDeviceMap(); - Iterator<String> it = deviceIdMap.keySet().iterator(); - while (it.hasNext()) { - String deviceId = it.next(); // deviceId represent devices - TsDeviceMetadataIndex deviceMetadataIndex = deviceIdMap.get(deviceId); - TsDeviceMetadata deviceMetadata = reader.readTsDeviceMetaData(deviceMetadataIndex); + Iterator<Entry<String, TsDeviceMetadataIndex>> entryIterator = deviceIdMap.entrySet() + .iterator(); + while (entryIterator.hasNext()) { + Entry<String, TsDeviceMetadataIndex> deviceMIEntry = entryIterator.next(); + String deviceId = deviceMIEntry.getKey(); + TsDeviceMetadataIndex deviceMI = deviceMIEntry.getValue(); + TsDeviceMetadata deviceMetadata = reader.readTsDeviceMetaData(deviceMI); List<ChunkGroupMetaData> rowGroupMetadataList = deviceMetadata.getChunkGroupMetaDataList(); - timeseries.clear(); - // firstly, get all timeseries in the same device + timeseriesSet.clear(); + /** firstly, get all timeseries in the same device **/ for (ChunkGroupMetaData chunkGroupMetaData : rowGroupMetadataList) { List<ChunkMetaData> chunkMetaDataList = chunkGroupMetaData .getChunkMetaDataList(); for (ChunkMetaData chunkMetaData : chunkMetaDataList) { String measurementUID = chunkMetaData.getMeasurementUid(); measurementUID = deviceId + "." + measurementUID; - timeseries.add(measurementUID); + timeseriesSet.add(measurementUID); } } - // secondly, use tsFile Reader to form SQL - + /** Secondly, use tsFile Reader to form SQL **/ ReadOnlyTsFile readTsFile = new ReadOnlyTsFile(reader); List<Path> paths = new ArrayList<>(); paths.clear(); - for (String timesery : timeseries) { - paths.add(new Path(timesery)); + for (String timeseries : timeseriesSet) { + paths.add(new Path(timeseries)); } QueryExpression queryExpression = QueryExpression.create(paths, null); QueryDataSet queryDataSet = readTsFile.query(queryExpression); + InsertPlan insertPlan; while (queryDataSet.hasNext()) { RowRecord record = queryDataSet.next(); List<Field> fields = record.getFields(); - String sqlFront = String.format("insert into %s(timestamp", deviceId); - String sqlRear = String.format(") values(%d", record.getTimestamp()); + List<String> measurementList = new ArrayList<>(); + List<String> insertValues = new ArrayList<>(); for (int i = 0; i < fields.size(); i++) { Field field = fields.get(i); - if (field.toString() != "null") { - sqlFront = String.format("%s,%s", sqlFront, paths.get(i).getMeasurement()); + if (!field.isNull()) { + measurementList.add(paths.get(i).getMeasurement()); if (fields.get(i).getDataType() == TSDataType.TEXT) { - sqlRear = String.format("%s,'%s'", sqlRear, field.toString()); + insertValues.add(String.format("'%s'", field.toString())); } else { - sqlRear = String.format("%s,%s", sqlRear, field.toString()); + insertValues.add(String.format("%s", field.toString())); } } } - String sql = String.format("%s%s)", sqlFront, sqlRear); - - statement.addBatch(sql); - count++; - if (count > 10000) { - statement.executeBatch(); - statement.clearBatch(); - count = 0; - } + insertPlan = new InsertPlan(deviceId, record.getTimestamp(), measurementList, + insertValues); + insertExecutor.processNonQuery(insertPlan); } } - statement.executeBatch(); - statement.clearBatch(); } catch (IOException e) { logger.error("IoTDB receiver can not parse tsfile into SQL", e); - } catch (SQLException | ClassNotFoundException e) { - logger.error("IoTDB post back receiver: jdbc cannot connect to IoTDB", e); + } catch (ProcessorException e) { + logger.error("Meet error while processing non-query.", e); } finally { try { if (reader != null) { @@ -459,84 +485,68 @@ public class ServerServiceImpl implements ServerService.Iface { } catch (IOException e) { logger.error("IoTDB receiver : Cannot close file stream {}", filePath, e); } - try { - if (statement != null) { - statement.close(); - } - } catch (SQLException e) { - logger.error("IoTDB receiver : Can not close JDBC connection", e); - } } } /** * Insert those valid data in the tsfile into IoTDB * - * @param overlapFiles:files which are conflict with the postback file + * @param overlapFiles:files which are conflict with the sync file */ public void mergeOldData(String filePath, List<String> overlapFiles) { - Set<String> timeseries = new HashSet<>(); + Set<String> timeseriesList = new HashSet<>(); TsFileSequenceReader reader = null; - Statement statement = null; - try (Connection connection = DriverManager.getConnection( - String.format("jdbc:iotdb://localhost:%d/", config.getRpcPort()), "root", - "root")) { - Class.forName(JDBC_DRIVER_NAME); - statement = connection.createStatement(); - int count = 0; - + OverflowQPExecutor insertExecutor = new OverflowQPExecutor(); + try { reader = new TsFileSequenceReader(filePath); Map<String, TsDeviceMetadataIndex> deviceIdMap = reader.readFileMetadata().getDeviceMap(); Iterator<String> it = deviceIdMap.keySet().iterator(); while (it.hasNext()) { - String key = it.next(); // key represent devices - TsDeviceMetadataIndex deviceMetadataIndex = deviceIdMap.get(key); - TsDeviceMetadata deviceMetadata = reader.readTsDeviceMetaData(deviceMetadataIndex); + String deviceID = it.next(); + TsDeviceMetadataIndex deviceMI = deviceIdMap.get(deviceID); + TsDeviceMetadata deviceMetadata = reader.readTsDeviceMetaData(deviceMI); List<ChunkGroupMetaData> chunkGroupMetaDataList = deviceMetadata .getChunkGroupMetaDataList(); - timeseries.clear(); - // firstly, get all timeseries in the same device + timeseriesList.clear(); + /** firstly, get all timeseries in the same device **/ for (ChunkGroupMetaData chunkGroupMetaData : chunkGroupMetaDataList) { List<ChunkMetaData> chunkMetaDataList = chunkGroupMetaData.getChunkMetaDataList(); for (ChunkMetaData timeSeriesChunkMetaData : chunkMetaDataList) { String measurementUID = timeSeriesChunkMetaData.getMeasurementUid(); - measurementUID = key + "." + measurementUID; - timeseries.add(measurementUID); + measurementUID = deviceID + "." + measurementUID; + timeseriesList.add(measurementUID); } } - // secondly, use tsFile Reader to form SQL - + /** secondly, use tsFile Reader to form SQL **/ ReadOnlyTsFile readOnlyTsFile = new ReadOnlyTsFile(reader); ArrayList<Path> paths = new ArrayList<>(); - for (String timesery : timeseries) { // compare data with one timesery in a round to get valid data + /** compare data with one timeseries in a round to get valid data **/ + for (String timeseries : timeseriesList) { paths.clear(); - paths.add(new Path(timesery)); - Map<String, String> originDataPoint = new HashMap<>(); - Map<String, String> newDataPoint = new HashMap<>(); - String sqlFormat = "insert into %s(timestamp,%s) values(%s,%s)"; + paths.add(new Path(timeseries)); + Map<InsertPlan, String> originDataPoint = new HashMap<>(); + Map<InsertPlan, String> newDataPoint = new HashMap<>(); QueryExpression queryExpression = QueryExpression.create(paths, null); QueryDataSet queryDataSet = readOnlyTsFile.query(queryExpression); while (queryDataSet.hasNext()) { RowRecord record = queryDataSet.next(); List<Field> fields = record.getFields(); - String sql; - for (int i = 0; i < fields.size(); - i++) { // get all data with the timesery in the postback file + /** get all data with the timeseries in the sync file **/ + for (int i = 0; i < fields.size(); i++) { Field field = fields.get(i); + List<String> measurementList = new ArrayList<>(); if (!field.isNull()) { - sql = String - .format(sqlFormat, key, paths.get(i).getMeasurement(), record.getTimestamp(), - "%s"); - if (field.getDataType() == TSDataType.TEXT) { - newDataPoint.put(sql, "'" + field.toString() + "'"); - } else { - newDataPoint.put(sql, field.toString()); - } + measurementList.add(paths.get(i).getMeasurement()); + InsertPlan insertPlan = new InsertPlan(deviceID, record.getTimestamp(), + measurementList, new ArrayList<>()); + newDataPoint.put(insertPlan, + field.getDataType() == TSDataType.TEXT ? String.format("'%s'", field.toString()) + : field.toString()); } } } - for (String overlapFile : overlapFiles) // get all data with the timesery in all overlap files. - { + /** get all data with the timeseries in all overlap files. **/ + for (String overlapFile : overlapFiles) { TsFileSequenceReader inputOverlap = null; try { inputOverlap = new TsFileSequenceReader(overlapFile); @@ -545,17 +555,17 @@ public class ServerServiceImpl implements ServerService.Iface { while (queryDataSetOverlap.hasNext()) { RowRecord recordOverlap = queryDataSetOverlap.next(); List<Field> fields = recordOverlap.getFields(); - String sql; for (int i = 0; i < fields.size(); i++) { Field field = fields.get(i); + List<String> measurementList = new ArrayList<>(); if (!field.isNull()) { - sql = String.format(sqlFormat, key, - paths.get(i).getMeasurement(), recordOverlap.getTimestamp(), "%s"); - if (field.getDataType() == TSDataType.TEXT) { - originDataPoint.put(sql, "'" + field.toString() + "'"); - } else { - originDataPoint.put(sql, field.toString()); - } + measurementList.add(paths.get(i).getMeasurement()); + InsertPlan insertPlan = new InsertPlan(deviceID, recordOverlap.getTimestamp(), + measurementList, new ArrayList<>()); + originDataPoint.put(insertPlan, + field.getDataType() == TSDataType.TEXT ? String + .format("'%s'", field.toString()) + : field.toString()); } } } @@ -565,57 +575,42 @@ public class ServerServiceImpl implements ServerService.Iface { } } } - if (originDataPoint - .isEmpty()) { // If there has no overlap data with the timesery, inserting all data in the postback file - for (Map.Entry<String, String> entry : newDataPoint.entrySet()) { - String sql = String.format(entry.getKey(), entry.getValue()); - statement.addBatch(sql); - count++; - if (count > 10000) { - statement.executeBatch(); - statement.clearBatch(); - count = 0; - } + + /** If there has no overlap data with the timeseries, inserting all data in the sync file **/ + if (originDataPoint.isEmpty()) { + for (Map.Entry<InsertPlan, String> entry : newDataPoint.entrySet()) { + InsertPlan insertPlan = entry.getKey(); + List<String> insertValues = new ArrayList<>(); + insertValues.add(entry.getValue()); + insertPlan.setValues(insertValues); + insertExecutor.processNonQuery(insertPlan); } - } else { // Compare every data to get valid data - for (Map.Entry<String, String> entry : newDataPoint.entrySet()) { + } else { + /** Compare every data to get valid data **/ + for (Map.Entry<InsertPlan, String> entry : newDataPoint.entrySet()) { if (!originDataPoint.containsKey(entry.getKey()) || (originDataPoint.containsKey(entry.getKey()) && !originDataPoint.get(entry.getKey()).equals(entry.getValue()))) { - String sql = String.format(entry.getKey(), entry.getValue()); - statement.addBatch(sql); - count++; - if (count > 10000) { - statement.executeBatch(); - statement.clearBatch(); - count = 0; - } + InsertPlan insertPlan = entry.getKey(); + List<String> insertValues = new ArrayList<>(); + insertValues.add(entry.getValue()); + insertPlan.setValues(insertValues); + insertExecutor.processNonQuery(insertPlan); } } } } } - statement.executeBatch(); - statement.clearBatch(); - } catch (SQLException e) { - logger.error("IoTDB post back receiver: sql cannot execute successfully in IoTDB", e); - } catch (ClassNotFoundException e) { - logger.error("IoTDB post back receiver: jdbc cannot connect to IoTDB", e); } catch (IOException e) { logger.error("IoTDB receiver can not parse tsfile into SQL", e); + } catch (ProcessorException e) { + logger.error("Meet error while processing non-query.", e); } finally { try { reader.close(); } catch (IOException e) { logger.error("IoTDB receiver : Cannot close file stream {}", filePath, e); } - try { - if (statement != null) { - statement.close(); - } - } catch (SQLException e) { - logger.error("IoTDB receiver : Can not close JDBC connection", e); - } } } @@ -685,8 +680,10 @@ public class ServerServiceImpl implements ServerService.Iface { } num++; - logger.info(String - .format("IoTDB receiver : Merging files has completed : %d/%d", num, fileNum.get())); + if (logger.isInfoEnabled()) { + logger.info(String + .format("IoTDB receiver : Merging files has completed : %d/%d", num, fileNum.get())); + } } } } diff --git a/iotdb/src/main/java/org/apache/iotdb/db/postback/sender/FileManager.java b/iotdb/src/main/java/org/apache/iotdb/db/sync/sender/FileManager.java similarity index 67% rename from iotdb/src/main/java/org/apache/iotdb/db/postback/sender/FileManager.java rename to iotdb/src/main/java/org/apache/iotdb/db/sync/sender/FileManager.java index d46a308..f0a3b66 100644 --- a/iotdb/src/main/java/org/apache/iotdb/db/postback/sender/FileManager.java +++ b/iotdb/src/main/java/org/apache/iotdb/db/sync/sender/FileManager.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.iotdb.db.postback.sender; +package org.apache.iotdb.db.sync.sender; import java.io.BufferedReader; import java.io.BufferedWriter; @@ -29,27 +29,43 @@ import java.util.HashSet; import java.util.Map; import java.util.Map.Entry; import java.util.Set; - import org.apache.iotdb.db.conf.IoTDBConfig; import org.apache.iotdb.db.conf.IoTDBDescriptor; -import org.apache.iotdb.db.postback.conf.PostBackSenderConfig; -import org.apache.iotdb.db.postback.conf.PostBackSenderDescriptor; +import org.apache.iotdb.db.sync.conf.Constans; +import org.apache.iotdb.db.sync.conf.SyncSenderConfig; +import org.apache.iotdb.db.sync.conf.SyncSenderDescriptor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * The class is to pick up which files need to postback. + * FileManager is used to pick up those tsfiles need to sync. * - * @author lta + * @author Tianan Li */ public class FileManager { private static final Logger LOGGER = LoggerFactory.getLogger(FileManager.class); - private Map<String, Set<String>> sendingFiles = new HashMap<>(); + + /** + * Files that need to be synchronized + **/ + private Map<String, Set<String>> validAllFiles = new HashMap<>(); + + /** + * All tsfiles in last synchronization process + **/ private Set<String> lastLocalFiles = new HashSet<>(); + + /** + * All tsfiles in data directory + **/ private Map<String, Set<String>> currentLocalFiles = new HashMap<>(); - private PostBackSenderConfig postbackConfig = PostBackSenderDescriptor.getInstance().getConfig(); - private IoTDBConfig tsfileConfig = IoTDBDescriptor.getInstance().getConfig(); + + private SyncSenderConfig syncConfig = SyncSenderDescriptor.getInstance().getConfig(); + + private IoTDBConfig systemConfig = IoTDBDescriptor.getInstance().getConfig(); + + private static final String RESTORE_SUFFIX = ".restore"; private FileManager() { } @@ -59,30 +75,30 @@ public class FileManager { } /** - * initialize FileManager. + * Initialize FileManager. */ - public void init() { - sendingFiles.clear(); + public void init() throws IOException { + validAllFiles.clear(); lastLocalFiles.clear(); currentLocalFiles.clear(); - getLastLocalFileList(postbackConfig.getLastFileInfo()); - getCurrentLocalFileList(tsfileConfig.getBufferWriteDirs()); - getSendingFileList(); + getLastLocalFileList(syncConfig.getLastFileInfo()); + getCurrentLocalFileList(systemConfig.getBufferWriteDirs()); + getValidFileList(); } /** - * get sending file list. + * get files that needs to be synchronized */ - public void getSendingFileList() { + public void getValidFileList() { for (Entry<String, Set<String>> entry : currentLocalFiles.entrySet()) { for (String path : entry.getValue()) { if (!lastLocalFiles.contains(path)) { - sendingFiles.get(entry.getKey()).add(path); + validAllFiles.get(entry.getKey()).add(path); } } } - LOGGER.info("IoTDB sender : Sender has got list of sending files."); - for (Entry<String, Set<String>> entry : sendingFiles.entrySet()) { + LOGGER.info("acquire list of valid files."); + for (Entry<String, Set<String>> entry : validAllFiles.entrySet()) { for (String path : entry.getValue()) { LOGGER.info(path); currentLocalFiles.get(entry.getKey()).remove(path); @@ -95,16 +111,16 @@ public class FileManager { * * @param path path */ - public void getLastLocalFileList(String path) { + public void getLastLocalFileList(String path) throws IOException { Set<String> fileList = new HashSet<>(); File file = new File(path); if (!file.exists()) { try { if (!file.createNewFile()) { - LOGGER.error("IoTDB post back sender: cannot create file {}", file.getAbsoluteFile()); + LOGGER.error("cannot create file {}", file.getAbsoluteFile()); } } catch (IOException e) { - LOGGER.error("IoTDB post back sender: cannot get last local file list", e); + throw new IOException("cannot get last local file list", e); } } else { try (BufferedReader bf = new BufferedReader(new FileReader(file))) { @@ -113,12 +129,11 @@ public class FileManager { fileList.add(fileName); } } catch (IOException e) { - LOGGER.error( - "IoTDB post back sender: cannot get last local file list when reading file {}.", - postbackConfig.getLastFileInfo(), e); + LOGGER.error("cannot get last local file list when reading file {}.", + syncConfig.getLastFileInfo()); + throw new IOException(e); } } - lastLocalFiles = fileList; } @@ -134,17 +149,17 @@ public class FileManager { } File[] listFiles = new File(path).listFiles(); for (File storageGroup : listFiles) { - if (storageGroup.isDirectory() && !storageGroup.getName().equals("postback")) { + if (storageGroup.isDirectory() && !storageGroup.getName().equals(Constans.SYNC)) { if (!currentLocalFiles.containsKey(storageGroup.getName())) { - currentLocalFiles.put(storageGroup.getName(), new HashSet<String>()); + currentLocalFiles.put(storageGroup.getName(), new HashSet<>()); } - if (!sendingFiles.containsKey(storageGroup.getName())) { - sendingFiles.put(storageGroup.getName(), new HashSet<String>()); + if (!validAllFiles.containsKey(storageGroup.getName())) { + validAllFiles.put(storageGroup.getName(), new HashSet<>()); } File[] files = storageGroup.listFiles(); for (File file : files) { - if (!file.getAbsolutePath().endsWith(".restore") && !new File( - file.getAbsolutePath() + ".restore").exists()) { + if (!file.getPath().endsWith(RESTORE_SUFFIX) && !new File( + file.getPath() + RESTORE_SUFFIX).exists()) { currentLocalFiles.get(storageGroup.getName()).add(file.getAbsolutePath()); } } @@ -166,12 +181,12 @@ public class FileManager { } } } catch (IOException e) { - LOGGER.error("IoTDB post back sender: cannot back up now local file info", e); + LOGGER.error("cannot back up now local file info", e); } } - public Map<String, Set<String>> getSendingFiles() { - return sendingFiles; + public Map<String, Set<String>> getValidAllFiles() { + return validAllFiles; } public Set<String> getLastLocalFiles() { diff --git a/iotdb/src/main/java/org/apache/iotdb/db/postback/sender/FileSender.java b/iotdb/src/main/java/org/apache/iotdb/db/sync/sender/FileSender.java similarity index 58% rename from iotdb/src/main/java/org/apache/iotdb/db/postback/sender/FileSender.java rename to iotdb/src/main/java/org/apache/iotdb/db/sync/sender/FileSender.java index 0ca03c0..8e8ada9 100644 --- a/iotdb/src/main/java/org/apache/iotdb/db/postback/sender/FileSender.java +++ b/iotdb/src/main/java/org/apache/iotdb/db/sync/sender/FileSender.java @@ -16,51 +16,52 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.iotdb.db.postback.sender; +package org.apache.iotdb.db.sync.sender; +import java.io.IOException; import java.util.Set; +import org.apache.iotdb.db.exception.SyncConnectionException; /** - * FileSender defines the methods of a sender in postback module. - * @author lta + * FileSender defines the methods of a sender in sync module. + * + * @author Tianan Li */ public interface FileSender { /** * Connect to server. */ - void connectToReceiver(String serverIp, int serverPort); + void establishConnection(String serverIp, int serverPort) throws SyncConnectionException; /** * Transfer UUID to receiver. */ - boolean transferUUID(String uuidPath); + boolean confirmIdentity(String uuidPath) throws SyncConnectionException, IOException; /** * Make file snapshots before sending files. */ - Set<String> makeFileSnapshot(Set<String> sendingFileList); + Set<String> makeFileSnapshot(Set<String> validFiles) throws IOException; /** * Send schema file to receiver. */ - void sendSchema(String schemaPath); + void syncSchema() throws SyncConnectionException; /** - * For each file in fileList, send it to receiver side. - * - * @param fileSnapshotList snapshot file list to send + * For all valid files, send it to receiver side and load these data in receiver. */ - void transferData(Set<String> fileSnapshotList); + void syncAllData() throws SyncConnectionException; /** * Close the socket after sending files. */ - boolean afterSending(); + boolean afterSynchronization() throws SyncConnectionException; /** - * Execute a postback task. + * Execute a sync task. */ - void postback(); + void sync() throws SyncConnectionException, IOException; } diff --git a/iotdb/src/main/java/org/apache/iotdb/db/sync/sender/FileSenderImpl.java b/iotdb/src/main/java/org/apache/iotdb/db/sync/sender/FileSenderImpl.java new file mode 100644 index 0000000..ea34a29 --- /dev/null +++ b/iotdb/src/main/java/org/apache/iotdb/db/sync/sender/FileSenderImpl.java @@ -0,0 +1,516 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.db.sync.sender; + +import java.io.BufferedReader; +import java.io.ByteArrayOutputStream; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.FileReader; +import java.io.IOException; +import java.math.BigInteger; +import java.net.InetAddress; +import java.net.ServerSocket; +import java.net.Socket; +import java.nio.ByteBuffer; +import java.nio.file.FileSystems; +import java.nio.file.Files; +import java.nio.file.Path; +import java.security.MessageDigest; +import java.util.ArrayList; +import java.util.Date; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.UUID; +import org.apache.iotdb.db.concurrent.ThreadName; +import org.apache.iotdb.db.exception.SyncConnectionException; +import org.apache.iotdb.db.sync.conf.Constans; +import org.apache.iotdb.db.sync.conf.SyncSenderConfig; +import org.apache.iotdb.db.sync.conf.SyncSenderDescriptor; +import org.apache.iotdb.db.sync.receiver.ServerService; +import org.apache.iotdb.db.utils.SyncUtils; +import org.apache.thrift.TException; +import org.apache.thrift.protocol.TBinaryProtocol; +import org.apache.thrift.protocol.TProtocol; +import org.apache.thrift.transport.TSocket; +import org.apache.thrift.transport.TTransport; +import org.apache.thrift.transport.TTransportException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * FileSenderImpl is used to transfer tsfiles that needs to sync to receiver. + * + * @author Tianan Li + */ +public class FileSenderImpl implements FileSender { + + private static final Logger LOGGER = LoggerFactory.getLogger(FileSenderImpl.class); + private TTransport transport; + private ServerService.Client serviceClient; + private List<String> schema = new ArrayList<>(); + + /** + * Mark the identity of sender + **/ + private String uuid; + + /** + * Monitor sync status + **/ + private Thread syncMonitor; + + /** + * Files that need to be synchronized + */ + private Map<String, Set<String>> validAllFiles; + + /** + * All tsfiles in data directory + **/ + private Map<String, Set<String>> currentLocalFiles; + + /** + * Mark the start time of last sync + **/ + private Date lastSyncTime = new Date(); + + /** + * If true, sync is in execution. + **/ + private volatile boolean syncStatus = false; + + /** + * Key means storage group, Set means corresponding tsfiles + **/ + private Map<String, Set<String>> validFileSnapshot = new HashMap<>(); + + private FileManager fileManager = FileManager.getInstance(); + private SyncSenderConfig config = SyncSenderDescriptor.getInstance().getConfig(); + + /** + * Monitor sync status. + */ + private final Runnable monitorSyncStatus = () -> { + Date oldTime = new Date(); + while (true) { + if (Thread.interrupted()) { + break; + } + Date currentTime = new Date(); + if (currentTime.getTime() / 1000 == oldTime.getTime() / 1000) { + continue; + } + if ((currentTime.getTime() - lastSyncTime.getTime()) + % (config.getUploadCycleInSeconds() * 1000) == 0) { + oldTime = currentTime; + if (syncStatus) { + LOGGER.info("Sync process is in execution!"); + } + } + } + }; + + private FileSenderImpl() { + } + + public static final FileSenderImpl getInstance() { + return InstanceHolder.INSTANCE; + } + + /** + * Create a sender and sync files to the receiver. + * + * @param args not used + */ + public static void main(String[] args) + throws InterruptedException, IOException, SyncConnectionException { + Thread.currentThread().setName(ThreadName.SYNC_CLIENT.getName()); + FileSenderImpl fileSenderImpl = new FileSenderImpl(); + fileSenderImpl.verifyPort(); + fileSenderImpl.startMonitor(); + fileSenderImpl.timedTask(); + } + + /** + * Start Monitor Thread + */ + public void startMonitor() { + syncMonitor = new Thread(monitorSyncStatus, ThreadName.SYNC_MONITOR.getName()); + syncMonitor.setDaemon(true); + syncMonitor.start(); + } + + /** + * Start sync task in a certain time. + */ + public void timedTask() throws InterruptedException, SyncConnectionException, IOException { + sync(); + lastSyncTime = new Date(); + Date currentTime; + while (true) { + if (Thread.interrupted()) { + break; + } + Thread.sleep(2000); + currentTime = new Date(); + if (currentTime.getTime() - lastSyncTime.getTime() + > config.getUploadCycleInSeconds() * 1000) { + lastSyncTime = currentTime; + sync(); + } + } + } + + /** + * Execute a sync task. + */ + @Override + public void sync() throws SyncConnectionException, IOException { + + //1. Clear old snapshots if necessary + for (String snapshotPath : config.getSnapshotPaths()) { + if (new File(snapshotPath).exists() && new File(snapshotPath).list().length != 0) { + /** It means that the last task of sync does not succeed! Clear the files and start to sync again **/ + try { + SyncUtils.deleteFile(new File(snapshotPath)); + } catch (IOException e) { + LOGGER.error("can not delete file {}", snapshotPath); + throw new IOException(e); + } + } + } + + syncStatus = true; + + // 2. Connect to sync server and Confirm Identity + establishConnection(config.getServerIp(), config.getServerPort()); + if (!confirmIdentity(config.getUuidPath())) { + LOGGER.error("Sorry, you do not have the permission to connect to sync receiver."); + syncStatus = false; + return; + } + + // 3. Acquire valid files and check + fileManager.init(); + validAllFiles = fileManager.getValidAllFiles(); + currentLocalFiles = fileManager.getCurrentLocalFiles(); + if (SyncUtils.isEmpty(validAllFiles)) { + LOGGER.info("There has no file to sync !"); + syncStatus = false; + return; + } + + // 4. Create snapshot + for (Entry<String, Set<String>> entry : validAllFiles.entrySet()) { + validFileSnapshot.put(entry.getKey(), makeFileSnapshot(entry.getValue())); + } + + // 5. Sync schema + syncSchema(); + + // 6. Sync data + syncAllData(); + + // 7. clear snapshot + for (String snapshotPath : config.getSnapshotPaths()) { + try { + SyncUtils.deleteFile(new File(snapshotPath)); + } catch (IOException e) { + LOGGER.error("can not delete snapshot", e); + } + } + + // 8. notify receiver that synchronization finish + // At this point the synchronization has finished even if connection fails + try { + serviceClient.afterReceiving(); + } catch (TException e) { + LOGGER.error("unable to connect to receiver ", e); + } + transport.close(); + LOGGER.info("sync process has finished"); + syncStatus = false; + } + + @Override + public void syncAllData() throws SyncConnectionException { + for (Entry<String, Set<String>> entry : validAllFiles.entrySet()) { + Set<String> validFiles = entry.getValue(); + Set<String> validSnapshot = validFileSnapshot.get(entry.getKey()); + if (validSnapshot.isEmpty()) { + continue; + } + LOGGER.info("sync process starts to transfer data of storage group {}", entry.getKey()); + try { + serviceClient.init(entry.getKey()); + } catch (TException e) { + throw new SyncConnectionException("unable to connect to receiver", e); + } + syncData(validSnapshot); + if (afterSynchronization()) { + currentLocalFiles.get(entry.getKey()).addAll(validFiles); + fileManager.setCurrentLocalFiles(currentLocalFiles); + fileManager.backupNowLocalFileInfo(config.getLastFileInfo()); + LOGGER.info("sync process has finished storage group {}.", entry.getKey()); + } else { + throw new SyncConnectionException( + "receiver cannot sync data, abandon this synchronization"); + } + } + } + + /** + * Establish a connection between the sender and the receiver. + * + * @param serverIp the ip address of the receiver + * @param serverPort must be same with port receiver set. + */ + @Override + public void establishConnection(String serverIp, int serverPort) throws SyncConnectionException { + transport = new TSocket(serverIp, serverPort); + TProtocol protocol = new TBinaryProtocol(transport); + serviceClient = new ServerService.Client(protocol); + try { + transport.open(); + } catch (TTransportException e) { + syncStatus = false; + LOGGER.error("cannot connect to server"); + throw new SyncConnectionException(e); + } + } + + /** + * UUID marks the identity of sender for receiver. + */ + @Override + public boolean confirmIdentity(String uuidPath) throws SyncConnectionException, IOException { + File file = new File(uuidPath); + if (!file.getParentFile().exists()) { + file.getParentFile().mkdirs(); + } + if (!file.exists()) { + try (FileOutputStream out = new FileOutputStream(file)) { + if (!file.createNewFile()) { + LOGGER.error("cannot create file {}", file.getPath()); + } + uuid = generateUUID(); + out.write(uuid.getBytes()); + } catch (IOException e) { + LOGGER.error("cannot write UUID to file {}", file.getPath()); + throw new IOException(e); + } + } else { + try (BufferedReader bf = new BufferedReader((new FileReader(uuidPath)))) { + uuid = bf.readLine(); + } catch (IOException e) { + LOGGER.error("cannot read UUID from file{}", file.getPath()); + throw new IOException(e); + } + } + boolean legalConnection; + try { + legalConnection = serviceClient.getUUID(uuid, + InetAddress.getLocalHost().getHostAddress()); + } catch (Exception e) { + LOGGER.error("cannot confirm identity with receiver"); + throw new SyncConnectionException(e); + } + return legalConnection; + } + + private String generateUUID() { + return Constans.SYNC + UUID.randomUUID().toString().replaceAll("-", ""); + } + + /** + * Create snapshots for valid files. + */ + @Override + public Set<String> makeFileSnapshot(Set<String> validFiles) throws IOException { + Set<String> validFilesSnapshot = new HashSet<>(); + try { + for (String filePath : validFiles) { + String snapshotFilePath = SyncUtils.getSnapshotFilePath(filePath); + validFilesSnapshot.add(snapshotFilePath); + File newFile = new File(snapshotFilePath); + if (!newFile.getParentFile().exists()) { + newFile.getParentFile().mkdirs(); + } + Path link = FileSystems.getDefault().getPath(snapshotFilePath); + Path target = FileSystems.getDefault().getPath(filePath); + Files.createLink(link, target); + } + } catch (IOException e) { + LOGGER.error("can not make fileSnapshot"); + throw new IOException(e); + } + return validFilesSnapshot; + } + + /** + * Transfer data of a storage group to receiver. + * + * @param fileSnapshotList list of sending snapshot files in a storage group. + */ + public void syncData(Set<String> fileSnapshotList) throws SyncConnectionException { + try { + int successNum = 0; + for (String snapshotFilePath : fileSnapshotList) { + successNum++; + File file = new File(snapshotFilePath); + List<String> filePathSplit = new ArrayList<>(); + String os = System.getProperty("os.name"); + if (os.toLowerCase().startsWith("windows")) { + String[] name = snapshotFilePath.split(File.separator + File.separator); + filePathSplit.add(name[name.length - 2]); + filePathSplit.add(name[name.length - 1]); + } else { + String[] name = snapshotFilePath.split(File.separator); + filePathSplit.add(name[name.length - 2]); + filePathSplit.add(name[name.length - 1]); + } + while (true) { + // Sync all data to receiver + byte[] buffer = new byte[Constans.DATA_CHUNK_SIZE]; + int data; + try (FileInputStream fis = new FileInputStream(file)) { + ByteArrayOutputStream bos = new ByteArrayOutputStream(Constans.DATA_CHUNK_SIZE); + while ((data = fis.read(buffer)) != -1) { // cut the file into pieces to send + bos.write(buffer, 0, data); + ByteBuffer buffToSend = ByteBuffer.wrap(bos.toByteArray()); + bos.reset(); + serviceClient.startReceiving(null, filePathSplit, buffToSend, 1); + } + bos.close(); + } + + // Get md5 of the file. + MessageDigest md = MessageDigest.getInstance("MD5"); + try (FileInputStream fis = new FileInputStream(file)) { + while ((data = fis.read(buffer)) != -1) { + md.update(buffer, 0, data); + } + } + + // the file is sent successfully + String md5OfSender = (new BigInteger(1, md.digest())).toString(16); + String md5OfReceiver = serviceClient.startReceiving(md5OfSender, filePathSplit, + null, 0); + if (md5OfSender.equals(md5OfReceiver)) { + LOGGER.info("receiver has received {} successfully.", snapshotFilePath); + break; + } + } + if (LOGGER.isInfoEnabled()) { + LOGGER.info(String.format("Task of synchronization has completed %d/%d.", successNum, + fileSnapshotList.size())); + } + } + } catch (Exception e) { + throw new SyncConnectionException("cannot sync data with receiver.", e); + } + } + + /** + * Sync schema with receiver. + */ + @Override + public void syncSchema() throws SyncConnectionException { + try (FileInputStream fis = new FileInputStream(new File(config.getSchemaPath()))) { + int mBufferSize = 4 * 1024 * 1024; + ByteArrayOutputStream bos = new ByteArrayOutputStream(mBufferSize); + byte[] buffer = new byte[mBufferSize]; + int n; + while ((n = fis.read(buffer)) != -1) { // cut the file into pieces to send + bos.write(buffer, 0, n); + ByteBuffer buffToSend = ByteBuffer.wrap(bos.toByteArray()); + bos.reset(); + // 1 represents there is still schema buffer to send. + serviceClient.getSchema(buffToSend, 1); + } + bos.close(); + // 0 represents the schema file has been transferred completely. + serviceClient.getSchema(null, 0); + } catch (Exception e) { + LOGGER.error("cannot sync schema ", e); + throw new SyncConnectionException(e); + } + } + + @Override + public boolean afterSynchronization() throws SyncConnectionException { + boolean successOrNot; + try { + successOrNot = serviceClient.merge(); + } catch (TException e) { + throw new SyncConnectionException( + "can not finish sync process because sync receiver has broken down.", e); + } + return successOrNot; + } + + /** + * The method is to verify whether the client port is bind or not, ensuring that only one client + * is running. + */ + private void verifyPort() throws IOException { + try { + Socket socket = new Socket("localhost", config.getClientPort()); + socket.close(); + LOGGER.error("The sync client has been started!"); + System.exit(0); + } catch (IOException e) { + try (ServerSocket listenerSocket = new ServerSocket(config.getClientPort())) { + Thread listener = new Thread(() -> { + while (true) { + try { + listenerSocket.accept(); + } catch (IOException e2) { + LOGGER.error("IoTDB sync sender: unable to listen to port{}", + config.getClientPort(), e2); + } + } + }); + listener.start(); + } catch (IOException e1) { + LOGGER.error("unable to listen to port{}", config.getClientPort()); + throw new IOException(); + } + } + } + + private static class InstanceHolder { + + private static final FileSenderImpl INSTANCE = new FileSenderImpl(); + } + + public void setConfig(SyncSenderConfig config) { + this.config = config; + } + + public List<String> getSchema() { + return schema; + } +} diff --git a/iotdb/src/main/java/org/apache/iotdb/db/utils/PostbackUtils.java b/iotdb/src/main/java/org/apache/iotdb/db/utils/SyncUtils.java similarity index 70% rename from iotdb/src/main/java/org/apache/iotdb/db/utils/PostbackUtils.java rename to iotdb/src/main/java/org/apache/iotdb/db/utils/SyncUtils.java index a23e0be..998aff2 100644 --- a/iotdb/src/main/java/org/apache/iotdb/db/utils/PostbackUtils.java +++ b/iotdb/src/main/java/org/apache/iotdb/db/utils/SyncUtils.java @@ -21,24 +21,25 @@ import java.util.Map; import java.util.Map.Entry; import java.util.Objects; import java.util.Set; -import org.apache.iotdb.db.postback.conf.PostBackSenderDescriptor; +import org.apache.iotdb.db.sync.conf.SyncSenderDescriptor; /** * @author lta */ -public class PostbackUtils { +public class SyncUtils { - private PostbackUtils(){} + private static final String IP_SEPARATOR = "\\."; - private static String[] snapshotPaths = PostBackSenderDescriptor.getInstance() + private static String[] snapshotPaths = SyncSenderDescriptor.getInstance() .getConfig().getSnapshotPaths(); + private SyncUtils() { + } + /** - * This method is to get a snapshot file seriesPath according to a tsfile seriesPath. Due to multiple directories, - * it's necessary to make a snapshot in the same disk. It's used by postback sender. - * - * @param filePath - * @return + * This method is to get a snapshot file seriesPath according to a tsfile seriesPath. Due to + * multiple directories, it's necessary to make a snapshot in the same disk. It's used by sync + * sender. */ public static String getSnapshotFilePath(String filePath) { String[] name; @@ -46,22 +47,23 @@ public class PostbackUtils { String os = System.getProperty("os.name"); if (os.toLowerCase().startsWith("windows")) { name = filePath.split(File.separator + File.separator); - relativeFilePath = - "data" + File.separator + name[name.length - 2] + File.separator + name[name.length - 1]; + relativeFilePath = name[name.length - 2] + File.separator + name[name.length - 1]; } else { name = filePath.split(File.separator); - relativeFilePath = - "data" + File.separator + name[name.length - 2] + File.separator + name[name.length - 1]; + relativeFilePath = name[name.length - 2] + File.separator + name[name.length - 1]; } String bufferWritePath = name[0]; for (int i = 1; i < name.length - 2; i++) { - bufferWritePath = bufferWritePath + File.separator + name[i]; + bufferWritePath = bufferWritePath + File.separatorChar + name[i]; } for (String snapshotPath : snapshotPaths) { if (snapshotPath.startsWith(bufferWritePath)) { if (!new File(snapshotPath).exists()) { new File(snapshotPath).mkdir(); } + if(snapshotPath.length() > 0 && snapshotPath.charAt(snapshotPath.length()-1)!=File.separatorChar){ + snapshotPath = snapshotPath + File.separatorChar; + } return snapshotPath + relativeFilePath; } } @@ -69,10 +71,7 @@ public class PostbackUtils { } /** - * Verify sending list is empty or not It's used by postback sender. - * - * @param sendingFileList - * @return + * Verify sending list is empty or not It's used by sync sender. */ public static boolean isEmpty(Map<String, Set<String>> sendingFileList) { for (Entry<String, Set<String>> entry : sendingFileList.entrySet()) { @@ -84,11 +83,8 @@ public class PostbackUtils { } /** - * Verify IP address with IP white list which contains more than one IP segment. It's used by postback sender. - * - * @param ipWhiteList - * @param ipAddress - * @return + * Verify IP address with IP white list which contains more than one IP segment. It's used by sync + * sender. */ public static boolean verifyIPSegment(String ipWhiteList, String ipAddress) { String[] ipSegments = ipWhiteList.split(","); @@ -104,43 +100,43 @@ public class PostbackUtils { /** * Verify IP address with IP segment. - * - * @param ipSegment - * @param ipAddress - * @param subnetMark - * @return */ private static boolean verifyIP(String ipSegment, String ipAddress, int subnetMark) { - String ipSegmentBinary = ""; - String ipAddressBinary = ""; - String[] ipSplits = ipSegment.split("\\."); + String ipSegmentBinary; + String ipAddressBinary; + String[] ipSplits = ipSegment.split(IP_SEPARATOR); DecimalFormat df = new DecimalFormat("00000000"); StringBuilder ipSegmentBuilder = new StringBuilder(); for (String IPsplit : ipSplits) { - ipSegmentBuilder.append(String.valueOf(df.format( - Integer.parseInt(Integer.toBinaryString(Integer.parseInt(IPsplit)))))); + ipSegmentBuilder.append(df.format( + Integer.parseInt(Integer.toBinaryString(Integer.parseInt(IPsplit))))); } ipSegmentBinary = ipSegmentBuilder.toString(); ipSegmentBinary = ipSegmentBinary.substring(0, subnetMark); - ipSplits = ipAddress.split("\\."); + ipSplits = ipAddress.split(IP_SEPARATOR); StringBuilder ipAddressBuilder = new StringBuilder(); for (String IPsplit : ipSplits) { - ipAddressBuilder.append(String.valueOf(df.format( - Integer.parseInt(Integer.toBinaryString(Integer.parseInt(IPsplit)))))); + ipAddressBuilder.append(df.format( + Integer.parseInt(Integer.toBinaryString(Integer.parseInt(IPsplit))))); } ipAddressBinary = ipAddressBuilder.toString(); ipAddressBinary = ipAddressBinary.substring(0, subnetMark); return ipAddressBinary.equals(ipSegmentBinary); } + /** + * Remove all files under this folder recursively + * + * @param file folder file + */ public static void deleteFile(File file) throws IOException { if (!file.exists()) { return; } if (file.isFile() || Objects.requireNonNull(file.list()).length == 0) { - if (!file.delete()){ + if (!file.delete()) { throw new IOException( - String.format("Cannot delete file : %s", file.getPath())); + String.format("cannot delete file : %s", file.getPath())); } } else { File[] files = file.listFiles(); @@ -149,7 +145,7 @@ public class PostbackUtils { deleteFile(f); if (!f.delete()) { throw new IOException( - String.format("Cannot delete file : %s", f.getPath())); + String.format("cannot delete file : %s", f.getPath())); } } } diff --git a/iotdb/src/main/thrift/ServerService.thrift b/iotdb/src/main/thrift/SyncServerService.thrift similarity index 79% rename from iotdb/src/main/thrift/ServerService.thrift rename to iotdb/src/main/thrift/SyncServerService.thrift index cdcc47c..d633d25 100755 --- a/iotdb/src/main/thrift/ServerService.thrift +++ b/iotdb/src/main/thrift/SyncServerService.thrift @@ -16,19 +16,23 @@ * specific language governing permissions and limitations * under the License. */ - namespace java org.apache.iotdb.db.postback.receiver +namespace java org.apache.iotdb.db.sync.receiver typedef i32 int typedef i16 short typedef i64 long + +enum SYNC_STATUS { + SUCCESS_STATUS, + SYNC_STATUS + +} + service ServerService{ bool getUUID(1:string uuid, 2:string address) - string startReceiving(1:string md5, 2:list<string> filename, 3:binary buff, 4:int status) - void getFileNodeInfo() - void mergeOldData(1:string path) - void mergeData() void getSchema(1:binary buff, 2:int status) - bool merge() + string receiveData(1:string md5, 2:list<string> filename, 3:binary buff, 4:int status) + bool load() void afterReceiving() void init(1:string storageGroup) } \ No newline at end of file diff --git a/iotdb/src/test/java/org/apache/iotdb/db/engine/filenodev2/FileNodeManagerBenchmark.java b/iotdb/src/test/java/org/apache/iotdb/db/engine/filenodev2/FileNodeManagerBenchmark.java index 5829f0e..55ee355 100644 --- a/iotdb/src/test/java/org/apache/iotdb/db/engine/filenodev2/FileNodeManagerBenchmark.java +++ b/iotdb/src/test/java/org/apache/iotdb/db/engine/filenodev2/FileNodeManagerBenchmark.java @@ -26,7 +26,7 @@ import org.apache.iotdb.db.exception.FileNodeManagerException; import org.apache.iotdb.db.exception.MetadataArgsErrorException; import org.apache.iotdb.db.exception.PathErrorException; import org.apache.iotdb.db.metadata.MManager; -import org.apache.iotdb.db.postback.utils.RandomNum; +import org.apache.iotdb.db.sync.test.RandomNum; import org.apache.iotdb.db.utils.EnvironmentUtils; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding; diff --git a/iotdb/src/test/java/org/apache/iotdb/db/postback/sender/FileManagerTest.java b/iotdb/src/test/java/org/apache/iotdb/db/sync/sender/FileManagerTest.java similarity index 95% rename from iotdb/src/test/java/org/apache/iotdb/db/postback/sender/FileManagerTest.java rename to iotdb/src/test/java/org/apache/iotdb/db/sync/sender/FileManagerTest.java index f808a87..a2dd6c6 100644 --- a/iotdb/src/test/java/org/apache/iotdb/db/postback/sender/FileManagerTest.java +++ b/iotdb/src/test/java/org/apache/iotdb/db/sync/sender/FileManagerTest.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.iotdb.db.postback.sender; +package org.apache.iotdb.db.sync.sender; import java.io.File; import java.io.IOException; @@ -26,6 +26,7 @@ import java.util.Map; import java.util.Map.Entry; import java.util.Random; import java.util.Set; +import org.apache.iotdb.db.sync.conf.Constans; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -34,9 +35,9 @@ import org.slf4j.LoggerFactory; public class FileManagerTest { - public static final String POST_BACK_DIRECTORY_TEST = "postback" + File.separator; + public static final String POST_BACK_DIRECTORY_TEST = Constans.SYNC + File.separator; public static final String LAST_FILE_INFO_TEST = - POST_BACK_DIRECTORY_TEST + "lastLocalFileList.txt"; + POST_BACK_DIRECTORY_TEST + Constans.LAST_LOCAL_FILE_NAME; public static final String SENDER_FILE_PATH_TEST = POST_BACK_DIRECTORY_TEST + "data"; FileManager manager = FileManager.getInstance(); private static final Logger LOGGER = LoggerFactory.getLogger(FileManagerTest.class); @@ -180,7 +181,7 @@ public class FileManagerTest { } String rand = String.valueOf(r.nextInt(10000)); String fileName = - SENDER_FILE_PATH_TEST + File.separator + String.valueOf(i) + File.separator + rand; + SENDER_FILE_PATH_TEST + File.separator + i + File.separator + rand; File file = new File(fileName); allFileList.get(String.valueOf(i)).add(file.getAbsolutePath()); if (!file.getParentFile().exists()) { @@ -268,7 +269,7 @@ public class FileManagerTest { allFileList = manager.getCurrentLocalFiles(); manager.getLastLocalFileList(LAST_FILE_INFO_TEST); lastlocalList = manager.getLastLocalFiles(); - manager.getSendingFileList(); + manager.getValidFileList(); assert (lastlocalList.isEmpty()); assert (isEmpty(allFileList)); @@ -302,8 +303,8 @@ public class FileManagerTest { allFileList = manager.getCurrentLocalFiles(); manager.backupNowLocalFileInfo(LAST_FILE_INFO_TEST); manager.getLastLocalFileList(LAST_FILE_INFO_TEST); - manager.getSendingFileList(); - sendingFileList = manager.getSendingFiles(); + manager.getValidFileList(); + sendingFileList = manager.getValidAllFiles(); assert (sendingFileList.size() == newFileList.size()); for (Entry<String, Set<String>> entry : sendingFileList.entrySet()) { assert (newFileList.containsKey(entry.getKey())); @@ -340,7 +341,7 @@ public class FileManagerTest { } String rand = String.valueOf(r.nextInt(10000)); String fileName = - SENDER_FILE_PATH_TEST + File.separator + String.valueOf(i) + File.separator + rand; + SENDER_FILE_PATH_TEST + File.separator + i + File.separator + rand; File file = new File(fileName); allFileList.get(String.valueOf(i)).add(file.getAbsolutePath()); newFileList.get(String.valueOf(i)).add(file.getAbsolutePath()); @@ -354,8 +355,8 @@ public class FileManagerTest { } manager.getCurrentLocalFileList(new String[]{SENDER_FILE_PATH_TEST}); manager.getLastLocalFileList(LAST_FILE_INFO_TEST); - manager.getSendingFileList(); - sendingFileList = manager.getSendingFiles(); + manager.getValidFileList(); + sendingFileList = manager.getValidAllFiles(); assert (sendingFileList.size() == newFileList.size()); for (Entry<String, Set<String>> entry : sendingFileList.entrySet()) { assert (newFileList.containsKey(entry.getKey())); diff --git a/iotdb/src/test/java/org/apache/iotdb/db/postback/sender/IoTDBSingleClientPostBackTest.java b/iotdb/src/test/java/org/apache/iotdb/db/sync/sender/IoTDBSingleClientPostBackTest.java similarity index 97% rename from iotdb/src/test/java/org/apache/iotdb/db/postback/sender/IoTDBSingleClientPostBackTest.java rename to iotdb/src/test/java/org/apache/iotdb/db/sync/sender/IoTDBSingleClientPostBackTest.java index d401369..ccdafff 100644 --- a/iotdb/src/test/java/org/apache/iotdb/db/postback/sender/IoTDBSingleClientPostBackTest.java +++ b/iotdb/src/test/java/org/apache/iotdb/db/sync/sender/IoTDBSingleClientPostBackTest.java @@ -16,11 +16,12 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.iotdb.db.postback.sender; +package org.apache.iotdb.db.sync.sender; import static org.junit.Assert.fail; import java.io.File; +import java.io.IOException; import java.sql.Connection; import java.sql.DriverManager; import java.sql.ResultSet; @@ -31,9 +32,10 @@ import java.util.Set; import org.apache.iotdb.db.conf.IoTDBConfig; import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.exception.StartupException; +import org.apache.iotdb.db.exception.SyncConnectionException; import org.apache.iotdb.db.integration.Constant; -import org.apache.iotdb.db.postback.conf.PostBackSenderConfig; -import org.apache.iotdb.db.postback.conf.PostBackSenderDescriptor; +import org.apache.iotdb.db.sync.conf.SyncSenderConfig; +import org.apache.iotdb.db.sync.conf.SyncSenderDescriptor; import org.apache.iotdb.db.service.IoTDB; import org.apache.iotdb.db.utils.EnvironmentUtils; import org.apache.iotdb.jdbc.Config; @@ -41,7 +43,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * The test is to run a complete postback function Before you run the test, make sure receiver has + * The test is to run a complete sync function Before you run the test, make sure receiver has * been cleaned up and inited. */ public class IoTDBSingleClientPostBackTest { @@ -49,7 +51,7 @@ public class IoTDBSingleClientPostBackTest { FileSenderImpl fileSenderImpl = FileSenderImpl.getInstance(); private IoTDBConfig conf = IoTDBDescriptor.getInstance().getConfig(); private String serverIpTest = "192.168.130.17"; - private PostBackSenderConfig config = PostBackSenderDescriptor.getInstance().getConfig(); + private SyncSenderConfig config = SyncSenderDescriptor.getInstance().getConfig(); private Set<String> dataSender = new HashSet<>(); private Set<String> dataReceiver = new HashSet<>(); private boolean success = true; @@ -143,7 +145,7 @@ public class IoTDBSingleClientPostBackTest { "insert into root.test.d0(timestamp,s1) values(3000,'1309')", "insert into root.test.d1.g0(timestamp,s0) values(400,1050)", "merge", "flush",}; private boolean testFlag = Constant.testFlag; - private static final String POSTBACK = "postback"; + private static final String POSTBACK = "sync"; private static final Logger logger = LoggerFactory.getLogger(IoTDBSingleClientPostBackTest.class); public static void main(String[] args) throws Exception { @@ -199,9 +201,9 @@ public class IoTDBSingleClientPostBackTest { } } - public void testPostback() { + public void testPostback() throws IOException, SyncConnectionException { if (testFlag) { - // the first time to postback + // the first time to sync logger.debug("It's the first time to post back!"); try { Thread.sleep(2000); @@ -220,7 +222,7 @@ public class IoTDBSingleClientPostBackTest { fail(e.getMessage()); } - fileSenderImpl.postback(); + fileSenderImpl.sync(); // Compare data of sender and receiver dataSender.clear(); @@ -315,7 +317,7 @@ public class IoTDBSingleClientPostBackTest { return; } - // the second time to postback + // the second time to sync logger.debug("It's the second time to post back!"); try { Thread.sleep(2000); @@ -343,7 +345,7 @@ public class IoTDBSingleClientPostBackTest { Thread.currentThread().interrupt(); } - fileSenderImpl.postback(); + fileSenderImpl.sync(); // Compare data of sender and receiver dataSender.clear(); @@ -443,7 +445,7 @@ public class IoTDBSingleClientPostBackTest { return; } - // the third time to postback + // the third time to sync logger.debug("It's the third time to post back!"); try { Thread.sleep(2000); @@ -465,7 +467,7 @@ public class IoTDBSingleClientPostBackTest { Thread.currentThread().interrupt(); } - fileSenderImpl.postback(); + fileSenderImpl.sync(); // Compare data of sender and receiver dataSender.clear(); diff --git a/iotdb/src/test/java/org/apache/iotdb/db/postback/sender/MultipleClientPostBackTest.java b/iotdb/src/test/java/org/apache/iotdb/db/sync/sender/MultipleClientPostBackTest.java similarity index 99% rename from iotdb/src/test/java/org/apache/iotdb/db/postback/sender/MultipleClientPostBackTest.java rename to iotdb/src/test/java/org/apache/iotdb/db/sync/sender/MultipleClientPostBackTest.java index 2c30fb0..c988471 100644 --- a/iotdb/src/test/java/org/apache/iotdb/db/postback/sender/MultipleClientPostBackTest.java +++ b/iotdb/src/test/java/org/apache/iotdb/db/sync/sender/MultipleClientPostBackTest.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.iotdb.db.postback.sender; +package org.apache.iotdb.db.sync.sender; import static org.junit.Assert.fail; diff --git a/iotdb/src/main/java/org/apache/iotdb/db/postback/utils/CreateDataSender1.java b/iotdb/src/test/java/org/apache/iotdb/db/sync/test/CreateDataSender1.java similarity index 96% rename from iotdb/src/main/java/org/apache/iotdb/db/postback/utils/CreateDataSender1.java rename to iotdb/src/test/java/org/apache/iotdb/db/sync/test/CreateDataSender1.java index e5e9664..42f1965 100644 --- a/iotdb/src/main/java/org/apache/iotdb/db/postback/utils/CreateDataSender1.java +++ b/iotdb/src/test/java/org/apache/iotdb/db/sync/test/CreateDataSender1.java @@ -16,9 +16,9 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.iotdb.db.postback.utils; +package org.apache.iotdb.db.sync.test; -import static org.apache.iotdb.db.postback.utils.RandomNum.getRandomInt; +import static org.apache.iotdb.db.sync.test.RandomNum.getRandomInt; import java.io.BufferedReader; import java.io.File; @@ -37,10 +37,10 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * The class is to generate data of whole timeseries (simulating jilian scene) to test stability of - * postback function. + * CreateDataSender1 is used to generate data of whole timeseries (simulating jilian scene) to test stability of + * sync function. * - * @author lta + * @author Tianan Li */ public class CreateDataSender1 { diff --git a/iotdb/src/main/java/org/apache/iotdb/db/postback/utils/CreateDataSender2.java b/iotdb/src/test/java/org/apache/iotdb/db/sync/test/CreateDataSender2.java similarity index 97% rename from iotdb/src/main/java/org/apache/iotdb/db/postback/utils/CreateDataSender2.java rename to iotdb/src/test/java/org/apache/iotdb/db/sync/test/CreateDataSender2.java index 07cb144..f335e5e 100644 --- a/iotdb/src/main/java/org/apache/iotdb/db/postback/utils/CreateDataSender2.java +++ b/iotdb/src/test/java/org/apache/iotdb/db/sync/test/CreateDataSender2.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.iotdb.db.postback.utils; +package org.apache.iotdb.db.sync.test; import java.io.BufferedReader; import java.io.File; @@ -35,10 +35,10 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * The class is to generate data of half timeseries (simulating jilian scene) to test stability of - * postback function. + * CreateDataSender2 is used to generate data of half timeseries (simulating jilian scene) to test stability of + * sync function. * - * @author lta + * @author Tianan Li */ public class CreateDataSender2 { diff --git a/iotdb/src/main/java/org/apache/iotdb/db/postback/utils/CreateDataSender3.java b/iotdb/src/test/java/org/apache/iotdb/db/sync/test/CreateDataSender3.java similarity index 98% rename from iotdb/src/main/java/org/apache/iotdb/db/postback/utils/CreateDataSender3.java rename to iotdb/src/test/java/org/apache/iotdb/db/sync/test/CreateDataSender3.java index e3f05a2..83693ca 100644 --- a/iotdb/src/main/java/org/apache/iotdb/db/postback/utils/CreateDataSender3.java +++ b/iotdb/src/test/java/org/apache/iotdb/db/sync/test/CreateDataSender3.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.iotdb.db.postback.utils; +package org.apache.iotdb.db.sync.test; import java.io.BufferedReader; import java.io.File; @@ -35,10 +35,10 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * The class is to generate data of another half timeseries (simulating jilian scene) which is - * different to those in CreateDataSender2 to test stability of postback function. + * CreateDataSender3 is used to generate data of another half timeseries (simulating jilian scene) which is + * different to those in CreateDataSender2 to test stability of sync function. * - * @author lta + * @author Tianan Li */ public class CreateDataSender3 { diff --git a/iotdb/src/main/java/org/apache/iotdb/db/postback/utils/RandomNum.java b/iotdb/src/test/java/org/apache/iotdb/db/sync/test/RandomNum.java similarity index 97% rename from iotdb/src/main/java/org/apache/iotdb/db/postback/utils/RandomNum.java rename to iotdb/src/test/java/org/apache/iotdb/db/sync/test/RandomNum.java index db28f3c..1ffee67 100644 --- a/iotdb/src/main/java/org/apache/iotdb/db/postback/utils/RandomNum.java +++ b/iotdb/src/test/java/org/apache/iotdb/db/sync/test/RandomNum.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.iotdb.db.postback.utils; +package org.apache.iotdb.db.sync.test; import java.util.Random; diff --git a/iotdb/src/main/java/org/apache/iotdb/db/postback/utils/Utils.java b/iotdb/src/test/java/org/apache/iotdb/db/sync/test/Utils.java similarity index 94% rename from iotdb/src/main/java/org/apache/iotdb/db/postback/utils/Utils.java rename to iotdb/src/test/java/org/apache/iotdb/db/sync/test/Utils.java index 3958024..5f968d4 100644 --- a/iotdb/src/main/java/org/apache/iotdb/db/postback/utils/Utils.java +++ b/iotdb/src/test/java/org/apache/iotdb/db/sync/test/Utils.java @@ -16,10 +16,10 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.iotdb.db.postback.utils; +package org.apache.iotdb.db.sync.test; /** - * Created by stefanie on 07/08/2017. + * * @author Tianan Li */ public class Utils {
