This is an automated email from the ASF dual-hosted git repository.

lta pushed a commit to branch Sync-Reconstruct
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git

commit 2f2e1d8e9c0922f9fcd5b4ac73c7e755d53f9865
Author: lta <[email protected]>
AuthorDate: Sun Mar 17 15:01:26 2019 +0800

    reconstruct thrift sync service
---
 .../iotdb/db/sync/receiver/ServerServiceImpl.java  | 326 +++++++++++----------
 .../iotdb/db/sync/sender/FileSenderImpl.java       |  22 +-
 .../src/main/thrift/sync.thrift                    |  15 +-
 3 files changed, 183 insertions(+), 180 deletions(-)

diff --git 
a/iotdb/src/main/java/org/apache/iotdb/db/sync/receiver/ServerServiceImpl.java 
b/iotdb/src/main/java/org/apache/iotdb/db/sync/receiver/ServerServiceImpl.java
index de2b2a3..435c20b 100644
--- 
a/iotdb/src/main/java/org/apache/iotdb/db/sync/receiver/ServerServiceImpl.java
+++ 
b/iotdb/src/main/java/org/apache/iotdb/db/sync/receiver/ServerServiceImpl.java
@@ -51,6 +51,8 @@ import org.apache.iotdb.db.metadata.MetadataOperationType;
 import org.apache.iotdb.db.qp.executor.OverflowQPExecutor;
 import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
 import org.apache.iotdb.db.utils.SyncUtils;
+import org.apache.iotdb.service.sync.thrift.SyncDataStatus;
+import org.apache.iotdb.service.sync.thrift.SyncService;
 import org.apache.iotdb.tsfile.file.metadata.ChunkGroupMetaData;
 import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;
 import org.apache.iotdb.tsfile.file.metadata.TsDeviceMetadata;
@@ -72,7 +74,7 @@ import org.slf4j.LoggerFactory;
 /**
  * @author Tianan Li
  */
-public class ServerServiceImpl implements ServerService.Iface {
+public class ServerServiceImpl implements SyncService.Iface {
 
   private static final Logger logger = 
LoggerFactory.getLogger(ServerServiceImpl.class);
   private static final FileNodeManager fileNodeManager = 
FileNodeManager.getInstance();
@@ -145,75 +147,6 @@ public class ServerServiceImpl implements 
ServerService.Iface {
     return SyncUtils.verifyIPSegment(config.getIpWhiteList(), ipAddress);
   }
 
-  /**
-   * Start receiving tsfile from sender
-   *
-   * @param status status = 0 : finish receiving one tsfile status = 1 : a 
tsfile has not received
-   * completely.
-   */
-  @Override
-  public String startReceiving(String md5OfSender, List<String> filePathSplit,
-      ByteBuffer dataToReceive, int status) throws TException {
-    String md5OfReceiver = "";
-    StringBuilder filePathBuilder = new StringBuilder();
-    FileChannel channel;
-    for (int i = 0; i < filePathSplit.size(); i++) {
-      if (i == filePathSplit.size() - 1) {
-        filePathBuilder.append(filePathSplit.get(i));
-      } else {
-        filePathBuilder.append(filePathSplit.get(i)).append(File.separator);
-      }
-    }
-    String filePath = filePathBuilder.toString();
-    filePath = postbackPath + uuid.get() + File.separator + filePath;
-    if (status == 1) { // there are still data stream to add
-      File file = new File(filePath);
-      if (!file.getParentFile().exists()) {
-        try {
-          file.getParentFile().mkdirs();
-          if (!file.createNewFile()) {
-            logger.error("IoTDB post back receiver: cannot create file {}", 
file.getAbsoluteFile());
-          }
-        } catch (IOException e) {
-          logger.error("IoTDB post back receiver: cannot make file", e);
-        }
-      }
-      try (FileOutputStream fos = new FileOutputStream(file, true)) {// append 
new data
-        channel = fos.getChannel();
-        channel.write(dataToReceive);
-      } catch (IOException e) {
-        logger.error("IoTDB post back receiver: cannot write data to file", e);
-
-      }
-    } else { // all data in the same file has received successfully
-      try (FileInputStream fis = new FileInputStream(filePath)) {
-        MessageDigest md = MessageDigest.getInstance("MD5");
-        int mBufferSize = 8 * 1024 * 1024;
-        byte[] buffer = new byte[mBufferSize];
-        int n;
-        while ((n = fis.read(buffer)) != -1) {
-          md.update(buffer, 0, n);
-        }
-        md5OfReceiver = (new BigInteger(1, md.digest())).toString(16);
-        if (md5OfSender.equals(md5OfReceiver)) {
-          fileNum.set(fileNum.get() + 1);
-          if (logger.isInfoEnabled()) {
-            logger.info(String
-                .format("IoTDB post back receiver : Receiver has received %d 
files from sender",
-                    fileNum.get()));
-          }
-        } else {
-          if (!new File(filePath).delete()) {
-            logger.error("IoTDB post back receiver : Receiver can not delete 
file {}",
-                new File(filePath).getAbsolutePath());
-          }
-        }
-      } catch (Exception e) {
-        logger.error("IoTDB post back receiver: cannot generate md5", e);
-      }
-    }
-    return md5OfReceiver;
-  }
 
   /**
    * Get schema from sender
@@ -222,8 +155,8 @@ public class ServerServiceImpl implements 
ServerService.Iface {
    * IoTDB through jdbc status = 1 : the schema file has not received 
completely.
    */
   @Override
-  public void getSchema(ByteBuffer schema, int status) {
-    if (status == 0) {
+  public void getSchema(ByteBuffer schema, SyncDataStatus status) {
+    if (status == SyncDataStatus.SUCCESS_STATUS) {
       /** sync metadata, include storage group and timeseries **/
       syncMetadata();
     } else {
@@ -319,10 +252,81 @@ public class ServerServiceImpl implements 
ServerService.Iface {
     }
   }
 
+  /**
+   * Start receiving tsfile from sender
+   *
+   * @param status status = 0 : finish receiving one tsfile status = 1 : a 
tsfile has not received
+   * completely.
+   */
+  @Override
+  public String receiveData(String md5OfSender, List<String> filePathSplit,
+      ByteBuffer dataToReceive, SyncDataStatus status) {
+    String md5OfReceiver = "";
+    StringBuilder filePathBuilder = new StringBuilder();
+    FileChannel channel;
+    for (int i = 0; i < filePathSplit.size(); i++) {
+      if (i == filePathSplit.size() - 1) {
+        filePathBuilder.append(filePathSplit.get(i));
+      } else {
+        filePathBuilder.append(filePathSplit.get(i)).append(File.separator);
+      }
+    }
+    String filePath = filePathBuilder.toString();
+    filePath = postbackPath + uuid.get() + File.separator + filePath;
+    if (status == SyncDataStatus.PROCESSING_STATUS) { // there are still data 
stream to add
+      File file = new File(filePath);
+      if (!file.getParentFile().exists()) {
+        try {
+          file.getParentFile().mkdirs();
+          if (!file.createNewFile()) {
+            logger.error("IoTDB post back receiver: cannot create file {}", 
file.getAbsoluteFile());
+          }
+        } catch (IOException e) {
+          logger.error("IoTDB post back receiver: cannot make file", e);
+        }
+      }
+      try (FileOutputStream fos = new FileOutputStream(file, true)) {// append 
new data
+        channel = fos.getChannel();
+        channel.write(dataToReceive);
+      } catch (IOException e) {
+        logger.error("IoTDB post back receiver: cannot write data to file", e);
+
+      }
+    } else { // all data in the same file has received successfully
+      try (FileInputStream fis = new FileInputStream(filePath)) {
+        MessageDigest md = MessageDigest.getInstance("MD5");
+        int mBufferSize = 8 * 1024 * 1024;
+        byte[] buffer = new byte[mBufferSize];
+        int n;
+        while ((n = fis.read(buffer)) != -1) {
+          md.update(buffer, 0, n);
+        }
+        md5OfReceiver = (new BigInteger(1, md.digest())).toString(16);
+        if (md5OfSender.equals(md5OfReceiver)) {
+          fileNum.set(fileNum.get() + 1);
+          if (logger.isInfoEnabled()) {
+            logger.info(String
+                .format("IoTDB post back receiver : Receiver has received %d 
files from sender",
+                    fileNum.get()));
+          }
+        } else {
+          if (!new File(filePath).delete()) {
+            logger.error("IoTDB post back receiver : Receiver can not delete 
file {}",
+                new File(filePath).getAbsolutePath());
+          }
+        }
+      } catch (Exception e) {
+        logger.error("IoTDB post back receiver: cannot generate md5", e);
+      }
+    }
+    return md5OfReceiver;
+  }
+
+
   @Override
-  public boolean merge() throws TException {
+  public boolean load() throws TException {
     getFileNodeInfo();
-    mergeData();
+    loadData();
     try {
       SyncUtils.deleteFile(new File(postbackPath + this.uuid.get()));
     } catch (IOException e) {
@@ -345,24 +349,9 @@ public class ServerServiceImpl implements 
ServerService.Iface {
   }
 
   /**
-   * Release threadLocal variable resources
-   */
-  @Override
-  public void afterReceiving() {
-    uuid.remove();
-    fileNum.remove();
-    fileNodeMap.remove();
-    fileNodeStartTime.remove();
-    fileNodeEndTime.remove();
-    schemaFromSenderPath.remove();
-    logger.info("IoTDB post back receiver: the postBack has finished!");
-  }
-
-  /**
    * Get all tsfiles' info which are sent from sender, it is prepare for 
merging these data
    */
-  @Override
-  public void getFileNodeInfo() throws TException {
+  public void getFileNodeInfo() {
     String filePath = postbackPath + uuid.get() + File.separator + "data";
     File root = new File(filePath);
     File[] files = root.listFiles();
@@ -411,11 +400,84 @@ public class ServerServiceImpl implements 
ServerService.Iface {
     }
   }
 
+
+  /**
+   * It is to merge data. If data in the tsfile is new, append the tsfile to 
the storage group
+   * directly. If data in the tsfile is old, it has two strategy to merge.It 
depends on the
+   * possibility of updating historical data.
+   */
+  public void loadData() throws TException {
+    int num = 0;
+    for (String storageGroup : fileNodeMap.get().keySet()) {
+      List<String> filesPath = fileNodeMap.get().get(storageGroup);
+      // before load extern tsFile, it is necessary to order files in the same 
SG
+      for (int i = 0; i < filesPath.size(); i++) {
+        for (int j = i + 1; j < filesPath.size(); j++) {
+          boolean swapOrNot = false;
+          Map<String, Long> startTimeI = 
fileNodeStartTime.get().get(filesPath.get(i));
+          Map<String, Long> endTimeI = 
fileNodeStartTime.get().get(filesPath.get(i));
+          Map<String, Long> startTimeJ = 
fileNodeStartTime.get().get(filesPath.get(j));
+          Map<String, Long> endTimeJ = 
fileNodeStartTime.get().get(filesPath.get(j));
+          for (String deviceId : endTimeI.keySet()) {
+            if (startTimeJ.containsKey(deviceId) && startTimeI.get(deviceId) >
+                endTimeJ.get(deviceId)) {
+              swapOrNot = true;
+              break;
+            }
+          }
+          if (swapOrNot) {
+            String temp = filesPath.get(i);
+            filesPath.set(i, filesPath.get(j));
+            filesPath.set(j, temp);
+          }
+        }
+      }
+
+      for (String path : filesPath) {
+        // get startTimeMap and endTimeMap
+        Map<String, Long> startTimeMap = fileNodeStartTime.get().get(path);
+        Map<String, Long> endTimeMap = fileNodeEndTime.get().get(path);
+
+        // create a new fileNode
+        String header = postbackPath + uuid.get() + File.separator + "data" + 
File.separator;
+        String relativePath = path.substring(header.length());
+        TsFileResource fileNode = new TsFileResource(startTimeMap, endTimeMap,
+            OverflowChangeType.NO_CHANGE,
+            Directories.getInstance().getNextFolderIndexForTsFile(), 
relativePath);
+        // call interface of load external file
+        try {
+          if (!fileNodeManager.appendFileToFileNode(storageGroup, fileNode, 
path)) {
+            // it is a file with overflow data
+            if (config.isUpdate_historical_data_possibility()) {
+              loadOldData(path);
+            } else {
+              List<String> overlapFiles = 
fileNodeManager.getOverlapFilesFromFileNode(
+                  storageGroup,
+                  fileNode, uuid.get());
+              if (overlapFiles.isEmpty()) {
+                loadOldData(path);
+              } else {
+                loadOldData(path, overlapFiles);
+              }
+            }
+          }
+        } catch (FileNodeManagerException e) {
+          logger.error("IoTDB receiver : Can not load external file ", e);
+        }
+
+        num++;
+        if (logger.isInfoEnabled()) {
+          logger.info(String
+              .format("IoTDB receiver : Merging files has completed : %d/%d", 
num, fileNum.get()));
+        }
+      }
+    }
+  }
+
   /**
    * Insert all data in the tsfile into IoTDB.
    */
-  @Override
-  public void mergeOldData(String filePath) throws TException {
+  public void loadOldData(String filePath) throws TException {
     Set<String> timeseriesSet = new HashSet<>();
     TsFileSequenceReader reader = null;
     OverflowQPExecutor insertExecutor = new OverflowQPExecutor();
@@ -493,7 +555,7 @@ public class ServerServiceImpl implements 
ServerService.Iface {
    *
    * @param overlapFiles:files which are conflict with the sync file
    */
-  public void mergeOldData(String filePath, List<String> overlapFiles) {
+  public void loadOldData(String filePath, List<String> overlapFiles) {
     Set<String> timeseriesList = new HashSet<>();
     TsFileSequenceReader reader = null;
     OverflowQPExecutor insertExecutor = new OverflowQPExecutor();
@@ -615,77 +677,17 @@ public class ServerServiceImpl implements 
ServerService.Iface {
   }
 
   /**
-   * It is to merge data. If data in the tsfile is new, append the tsfile to 
the storage group
-   * directly. If data in the tsfile is old, it has two strategy to merge.It 
depends on the
-   * possibility of updating historical data.
+   * Release threadLocal variable resources
    */
   @Override
-  public void mergeData() throws TException {
-    int num = 0;
-    for (String storageGroup : fileNodeMap.get().keySet()) {
-      List<String> filesPath = fileNodeMap.get().get(storageGroup);
-      // before load extern tsFile, it is necessary to order files in the same 
SG
-      for (int i = 0; i < filesPath.size(); i++) {
-        for (int j = i + 1; j < filesPath.size(); j++) {
-          boolean swapOrNot = false;
-          Map<String, Long> startTimeI = 
fileNodeStartTime.get().get(filesPath.get(i));
-          Map<String, Long> endTimeI = 
fileNodeStartTime.get().get(filesPath.get(i));
-          Map<String, Long> startTimeJ = 
fileNodeStartTime.get().get(filesPath.get(j));
-          Map<String, Long> endTimeJ = 
fileNodeStartTime.get().get(filesPath.get(j));
-          for (String deviceId : endTimeI.keySet()) {
-            if (startTimeJ.containsKey(deviceId) && startTimeI.get(deviceId) >
-                endTimeJ.get(deviceId)) {
-              swapOrNot = true;
-              break;
-            }
-          }
-          if (swapOrNot) {
-            String temp = filesPath.get(i);
-            filesPath.set(i, filesPath.get(j));
-            filesPath.set(j, temp);
-          }
-        }
-      }
-
-      for (String path : filesPath) {
-        // get startTimeMap and endTimeMap
-        Map<String, Long> startTimeMap = fileNodeStartTime.get().get(path);
-        Map<String, Long> endTimeMap = fileNodeEndTime.get().get(path);
-
-        // create a new fileNode
-        String header = postbackPath + uuid.get() + File.separator + "data" + 
File.separator;
-        String relativePath = path.substring(header.length());
-        TsFileResource fileNode = new TsFileResource(startTimeMap, endTimeMap,
-            OverflowChangeType.NO_CHANGE,
-            Directories.getInstance().getNextFolderIndexForTsFile(), 
relativePath);
-        // call interface of load external file
-        try {
-          if (!fileNodeManager.appendFileToFileNode(storageGroup, fileNode, 
path)) {
-            // it is a file with overflow data
-            if (config.isUpdate_historical_data_possibility()) {
-              mergeOldData(path);
-            } else {
-              List<String> overlapFiles = 
fileNodeManager.getOverlapFilesFromFileNode(
-                  storageGroup,
-                  fileNode, uuid.get());
-              if (overlapFiles.isEmpty()) {
-                mergeOldData(path);
-              } else {
-                mergeOldData(path, overlapFiles);
-              }
-            }
-          }
-        } catch (FileNodeManagerException e) {
-          logger.error("IoTDB receiver : Can not load external file ", e);
-        }
-
-        num++;
-        if (logger.isInfoEnabled()) {
-          logger.info(String
-              .format("IoTDB receiver : Merging files has completed : %d/%d", 
num, fileNum.get()));
-        }
-      }
-    }
+  public void cleanUp() {
+    uuid.remove();
+    fileNum.remove();
+    fileNodeMap.remove();
+    fileNodeStartTime.remove();
+    fileNodeEndTime.remove();
+    schemaFromSenderPath.remove();
+    logger.info("IoTDB post back receiver: the postBack has finished!");
   }
 
   public Map<String, List<String>> getFileNodeMap() {
diff --git 
a/iotdb/src/main/java/org/apache/iotdb/db/sync/sender/FileSenderImpl.java 
b/iotdb/src/main/java/org/apache/iotdb/db/sync/sender/FileSenderImpl.java
index ea34a29..e8939de 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/sync/sender/FileSenderImpl.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/sync/sender/FileSenderImpl.java
@@ -48,8 +48,9 @@ import org.apache.iotdb.db.exception.SyncConnectionException;
 import org.apache.iotdb.db.sync.conf.Constans;
 import org.apache.iotdb.db.sync.conf.SyncSenderConfig;
 import org.apache.iotdb.db.sync.conf.SyncSenderDescriptor;
-import org.apache.iotdb.db.sync.receiver.ServerService;
 import org.apache.iotdb.db.utils.SyncUtils;
+import org.apache.iotdb.service.sync.thrift.SyncDataStatus;
+import org.apache.iotdb.service.sync.thrift.SyncService;
 import org.apache.thrift.TException;
 import org.apache.thrift.protocol.TBinaryProtocol;
 import org.apache.thrift.protocol.TProtocol;
@@ -68,7 +69,7 @@ public class FileSenderImpl implements FileSender {
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(FileSenderImpl.class);
   private TTransport transport;
-  private ServerService.Client serviceClient;
+  private SyncService.Client serviceClient;
   private List<String> schema = new ArrayList<>();
 
   /**
@@ -245,7 +246,7 @@ public class FileSenderImpl implements FileSender {
     // 8. notify receiver that synchronization finish
     // At this point the synchronization has finished even if connection fails
     try {
-      serviceClient.afterReceiving();
+      serviceClient.cleanUp();
     } catch (TException e) {
       LOGGER.error("unable to connect to receiver ", e);
     }
@@ -291,7 +292,7 @@ public class FileSenderImpl implements FileSender {
   public void establishConnection(String serverIp, int serverPort) throws 
SyncConnectionException {
     transport = new TSocket(serverIp, serverPort);
     TProtocol protocol = new TBinaryProtocol(transport);
-    serviceClient = new ServerService.Client(protocol);
+    serviceClient = new SyncService.Client(protocol);
     try {
       transport.open();
     } catch (TTransportException e) {
@@ -401,7 +402,8 @@ public class FileSenderImpl implements FileSender {
               bos.write(buffer, 0, data);
               ByteBuffer buffToSend = ByteBuffer.wrap(bos.toByteArray());
               bos.reset();
-              serviceClient.startReceiving(null, filePathSplit, buffToSend, 1);
+              serviceClient
+                  .receiveData(null, filePathSplit, buffToSend, 
SyncDataStatus.PROCESSING_STATUS);
             }
             bos.close();
           }
@@ -416,8 +418,8 @@ public class FileSenderImpl implements FileSender {
 
           // the file is sent successfully
           String md5OfSender = (new BigInteger(1, md.digest())).toString(16);
-          String md5OfReceiver = serviceClient.startReceiving(md5OfSender, 
filePathSplit,
-              null, 0);
+          String md5OfReceiver = serviceClient.receiveData(md5OfSender, 
filePathSplit,
+              null, SyncDataStatus.SUCCESS_STATUS);
           if (md5OfSender.equals(md5OfReceiver)) {
             LOGGER.info("receiver has received {} successfully.", 
snapshotFilePath);
             break;
@@ -448,11 +450,11 @@ public class FileSenderImpl implements FileSender {
         ByteBuffer buffToSend = ByteBuffer.wrap(bos.toByteArray());
         bos.reset();
         // 1 represents there is still schema buffer to send.
-        serviceClient.getSchema(buffToSend, 1);
+        serviceClient.getSchema(buffToSend, SyncDataStatus.PROCESSING_STATUS);
       }
       bos.close();
       // 0 represents the schema file has been transferred completely.
-      serviceClient.getSchema(null, 0);
+      serviceClient.getSchema(null, SyncDataStatus.SUCCESS_STATUS);
     } catch (Exception e) {
       LOGGER.error("cannot sync schema ", e);
       throw new SyncConnectionException(e);
@@ -463,7 +465,7 @@ public class FileSenderImpl implements FileSender {
   public boolean afterSynchronization() throws SyncConnectionException {
     boolean successOrNot;
     try {
-      successOrNot = serviceClient.merge();
+      successOrNot = serviceClient.load();
     } catch (TException e) {
       throw new SyncConnectionException(
           "can not finish sync process because sync receiver has broken 
down.", e);
diff --git a/iotdb/src/main/thrift/SyncServerService.thrift 
b/service-rpc/src/main/thrift/sync.thrift
similarity index 80%
rename from iotdb/src/main/thrift/SyncServerService.thrift
rename to service-rpc/src/main/thrift/sync.thrift
index d633d25..a947507 100755
--- a/iotdb/src/main/thrift/SyncServerService.thrift
+++ b/service-rpc/src/main/thrift/sync.thrift
@@ -16,23 +16,22 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-namespace java org.apache.iotdb.db.sync.receiver
+namespace java org.apache.iotdb.service.sync.thrift
 
 typedef i32 int 
 typedef i16 short
 typedef i64 long
 
-enum SYNC_STATUS {
+enum SyncDataStatus {
   SUCCESS_STATUS,
-  SYNC_STATUS
-
+  PROCESSING_STATUS
 }
 
-service ServerService{
+service SyncService{
        bool getUUID(1:string uuid, 2:string address)
-       void getSchema(1:binary buff, 2:int status)
-       string receiveData(1:string md5, 2:list<string> filename, 3:binary 
buff, 4:int status)
+       void getSchema(1:binary buff, 2:SyncDataStatus status)
+       string receiveData(1:string md5, 2:list<string> filename, 3:binary 
buff, 4:SyncDataStatus status)
        bool load()
-       void afterReceiving()
+       void cleanUp()
        void init(1:string storageGroup)
 }
\ No newline at end of file

Reply via email to