This is an automated email from the ASF dual-hosted git repository.

lta pushed a commit to branch Sync-Reconstruct
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git


The following commit(s) were added to refs/heads/Sync-Reconstruct by this push:
     new 3f73140  improve some improper implementations
3f73140 is described below

commit 3f731406e524843fd65d94bedc9d3d8a5bb71f3e
Author: lta <[email protected]>
AuthorDate: Wed Mar 20 22:01:08 2019 +0800

    improve some improper implementations
---
 .../org/apache/iotdb/db/conf/IoTDBDescriptor.java  |  10 +-
 .../org/apache/iotdb/db/sync/conf/Constans.java    |   5 +
 .../iotdb/db/sync/conf/SyncSenderConfig.java       |  14 +-
 .../iotdb/db/sync/conf/SyncSenderDescriptor.java   |  18 +--
 .../iotdb/db/sync/receiver/ServerServiceImpl.java  | 162 +++++++++++----------
 .../iotdb/db/sync/sender/FileSenderImpl.java       |  97 ++++++++----
 .../org/apache/iotdb/db/utils/FilePathUtils.java   |  75 +++++-----
 .../java/org/apache/iotdb/db/utils/SyncUtils.java  |  23 ---
 service-rpc/src/main/thrift/sync.thrift            |   5 +-
 9 files changed, 215 insertions(+), 194 deletions(-)

diff --git a/iotdb/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java 
b/iotdb/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index 8da75f2..7ab5215 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -257,12 +257,10 @@ public class IoTDBDescriptor {
     } finally {
       // update all data seriesPath
       conf.updatePath();
-      if (inputStream != null) {
-        try {
-          inputStream.close();
-        } catch (IOException e) {
-          LOGGER.error("Fail to close config file input stream because ", e);
-        }
+      try {
+        inputStream.close();
+      } catch (IOException e) {
+        LOGGER.error("Fail to close config file input stream because ", e);
       }
     }
   }
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/sync/conf/Constans.java 
b/iotdb/src/main/java/org/apache/iotdb/db/sync/conf/Constans.java
index 23c8c54..f43207f 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/sync/conf/Constans.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/sync/conf/Constans.java
@@ -39,4 +39,9 @@ public class Constans {
    **/
   public static final int DATA_CHUNK_SIZE = 64 * 1024 * 1024;
 
+  /**
+   * Max try when syncing the same file to receiver fails.
+   */
+  public static final int MAX_SYNC_FILE_TRY = 10;
+
 }
diff --git 
a/iotdb/src/main/java/org/apache/iotdb/db/sync/conf/SyncSenderConfig.java 
b/iotdb/src/main/java/org/apache/iotdb/db/sync/conf/SyncSenderConfig.java
index a454f50..50bd443 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/sync/conf/SyncSenderConfig.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/sync/conf/SyncSenderConfig.java
@@ -21,6 +21,9 @@ package org.apache.iotdb.db.sync.conf;
 import java.io.File;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.metadata.MetadataConstant;
+import org.apache.iotdb.db.utils.FilePathUtils;
+import org.apache.iotdb.db.utils.FileUtils;
+import org.apache.iotdb.db.utils.SyncUtils;
 
 public class SyncSenderConfig {
 
@@ -38,10 +41,7 @@ public class SyncSenderConfig {
 
   public void init() {
     String metadataDirPath = 
IoTDBDescriptor.getInstance().getConfig().getMetadataDir();
-    if (metadataDirPath.length() > 0
-        && metadataDirPath.charAt(metadataDirPath.length() - 1) != 
File.separatorChar) {
-      metadataDirPath = metadataDirPath + File.separatorChar;
-    }
+    metadataDirPath = FilePathUtils.regularizePath(metadataDirPath);
     schemaPath = metadataDirPath + MetadataConstant.METADATA_LOG;
     if (dataDirectory.length() > 0
         && dataDirectory.charAt(dataDirectory.length() - 1) != 
File.separatorChar) {
@@ -55,11 +55,7 @@ public class SyncSenderConfig {
     snapshotPaths = new String[bufferwriteDirectory.length];
     for (int i = 0; i < bufferwriteDirectory.length; i++) {
       bufferwriteDirectory[i] = new 
File(bufferwriteDirectory[i]).getAbsolutePath();
-      if (bufferwriteDirectory[i].length() > 0
-          && bufferwriteDirectory[i].charAt(bufferwriteDirectory[i].length() - 
1)
-          != File.separatorChar) {
-        bufferwriteDirectory[i] = bufferwriteDirectory[i] + File.separatorChar;
-      }
+      bufferwriteDirectory[i] = 
FilePathUtils.regularizePath(bufferwriteDirectory[i]);
       snapshotPaths[i] = bufferwriteDirectory[i] + Constans.SYNC_CLIENT + 
File.separatorChar
           + Constans.DATA_SNAPSHOT_NAME
           + File.separatorChar;
diff --git 
a/iotdb/src/main/java/org/apache/iotdb/db/sync/conf/SyncSenderDescriptor.java 
b/iotdb/src/main/java/org/apache/iotdb/db/sync/conf/SyncSenderDescriptor.java
index 9a8ed5d..99dcb56 100644
--- 
a/iotdb/src/main/java/org/apache/iotdb/db/sync/conf/SyncSenderDescriptor.java
+++ 
b/iotdb/src/main/java/org/apache/iotdb/db/sync/conf/SyncSenderDescriptor.java
@@ -25,6 +25,8 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.util.Properties;
 import org.apache.iotdb.db.conf.IoTDBConstant;
+import org.apache.iotdb.db.utils.FilePathUtils;
+import org.apache.iotdb.db.utils.SyncUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -105,11 +107,7 @@ public class SyncSenderDescriptor {
       String[] iotdbBufferwriteDirectory = conf.getBufferwriteDirectory();
       String[] snapshots = new String[conf.getBufferwriteDirectory().length];
       for (int i = 0; i < conf.getBufferwriteDirectory().length; i++) {
-        if (iotdbBufferwriteDirectory[i].length() > 0
-            && 
iotdbBufferwriteDirectory[i].charAt(iotdbBufferwriteDirectory[i].length() - 1)
-            != File.separatorChar) {
-          iotdbBufferwriteDirectory[i] = iotdbBufferwriteDirectory[i] + 
File.separatorChar;
-        }
+        iotdbBufferwriteDirectory[i] = 
FilePathUtils.regularizePath(iotdbBufferwriteDirectory[i]);
         snapshots[i] = iotdbBufferwriteDirectory[i] + Constans.SYNC_CLIENT + 
File.separatorChar
             + Constans.DATA_SNAPSHOT_NAME + File.separatorChar;
       }
@@ -120,12 +118,10 @@ public class SyncSenderDescriptor {
     } catch (Exception e) {
       LOGGER.warn("Error format in config file because {}, use default 
configuration", e);
     } finally {
-      if (inputStream != null) {
-        try {
-          inputStream.close();
-        } catch (IOException e) {
-          LOGGER.error("Fail to close sync config file input stream because ", 
e);
-        }
+      try {
+        inputStream.close();
+      } catch (IOException e) {
+        LOGGER.error("Fail to close sync config file input stream because ", 
e);
       }
     }
   }
diff --git 
a/iotdb/src/main/java/org/apache/iotdb/db/sync/receiver/ServerServiceImpl.java 
b/iotdb/src/main/java/org/apache/iotdb/db/sync/receiver/ServerServiceImpl.java
index fa072cb..7c938b3 100644
--- 
a/iotdb/src/main/java/org/apache/iotdb/db/sync/receiver/ServerServiceImpl.java
+++ 
b/iotdb/src/main/java/org/apache/iotdb/db/sync/receiver/ServerServiceImpl.java
@@ -29,14 +29,16 @@ import java.nio.ByteBuffer;
 import java.nio.channels.FileChannel;
 import java.security.MessageDigest;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
-import java.util.Objects;
 import java.util.Set;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.iotdb.db.concurrent.ThreadName;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
@@ -54,6 +56,7 @@ import org.apache.iotdb.db.metadata.MetadataOperationType;
 import org.apache.iotdb.db.qp.executor.OverflowQPExecutor;
 import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
 import org.apache.iotdb.db.sync.conf.Constans;
+import org.apache.iotdb.db.utils.FilePathUtils;
 import org.apache.iotdb.db.utils.SyncUtils;
 import org.apache.iotdb.service.sync.thrift.SyncDataStatus;
 import org.apache.iotdb.service.sync.thrift.SyncService;
@@ -137,10 +140,10 @@ public class ServerServiceImpl implements 
SyncService.Iface {
   private String syncDataPath;
 
   /**
-   * Init threadLocal variable
+   * Init threadLocal variable and delete old useless files.
    */
   @Override
-  public void init(String storageGroup) {
+  public boolean init(String storageGroup) {
     if (logger.isInfoEnabled()) {
       logger.info("Sync process starts to receive data of storage group {}", 
storageGroup);
     }
@@ -148,6 +151,26 @@ public class ServerServiceImpl implements 
SyncService.Iface {
     fileNodeMap.set(new HashMap<>());
     fileNodeStartTime.set(new HashMap<>());
     fileNodeEndTime.set(new HashMap<>());
+    try {
+      FileUtils.deleteDirectory(new File(syncDataPath));
+    } catch (IOException e) {
+      logger.error("cannot delete directory {} ", syncFolderPath);
+      return false;
+    }
+    for (String bufferWritePath : bufferWritePaths) {
+      bufferWritePath = FilePathUtils.regularizePath(bufferWritePath);
+      String backupPath = bufferWritePath + SYNC_SERVER + File.separator;
+      File backupDirectory = new File(backupPath, this.uuid.get());
+      if (backupDirectory.exists() && backupDirectory.list().length != 0) {
+        try {
+          FileUtils.deleteDirectory(backupDirectory);
+        } catch (IOException e) {
+          logger.error("cannot delete directory {} ", syncFolderPath);
+          return false;
+        }
+      }
+    }
+    return true;
   }
 
   /**
@@ -165,54 +188,38 @@ public class ServerServiceImpl implements 
SyncService.Iface {
    * Init file path and clear data if last sync process failed.
    */
   private void initPath() {
-    if (dataPath.length() > 0 && dataPath.charAt(dataPath.length() - 1) != 
File.separatorChar) {
-      dataPath = dataPath + File.separatorChar;
-    }
+    dataPath = FilePathUtils.regularizePath(dataPath);
     syncFolderPath = dataPath + SYNC_SERVER + File.separatorChar + 
this.uuid.get();
     syncDataPath = syncFolderPath + File.separatorChar + 
Constans.DATA_SNAPSHOT_NAME;
     schemaFromSenderPath
         .set(syncFolderPath + File.separator + MetadataConstant.METADATA_LOG);
-    File syncFileDirectory = new File(syncFolderPath, this.uuid.get());
-    if (syncFileDirectory.exists()
-        && Objects.requireNonNull(syncFileDirectory.list()).length != 0) {
-      SyncUtils.deleteFile(syncFileDirectory);
-    }
-    for (String bufferWritePath : bufferWritePaths) {
-      if (bufferWritePath.length() > 0
-          && bufferWritePath.charAt(bufferWritePath.length() - 1) != 
File.separatorChar) {
-        bufferWritePath = bufferWritePath + File.separatorChar;
-      }
-      String backupPath = bufferWritePath + SYNC_SERVER + File.separator;
-      File backupDirectory = new File(backupPath, this.uuid.get());
-      if (backupDirectory.exists() && 
Objects.requireNonNull(backupDirectory.list()).length != 0) {
-        /** if does not exist, it means that the last time sync failed, clear 
data in the uuid directory and receive the data again **/
-        SyncUtils.deleteFile(backupDirectory);
-      }
-    }
   }
 
   /**
    * Acquire schema from sender
    *
-   * @param status: SUCCESS_STATUS or PROCESSING_STATUS. status = 
SUCCESS_STATUS : finish receiving
-   * schema file, start to sync schema. status = SUCCESS_STATUS : the schema 
file has not received
-   * completely.
+   * @param status: FINIFSH_STATUS, SUCCESS_STATUS or PROCESSING_STATUS. 
status = FINISH_STATUS :
+   * finish receiving schema file, start to sync schema. status = 
PROCESSING_STATUS : the schema
+   * file has not received SUCCESS_STATUS: load metadata completely.
    */
   @Override
-  public void syncSchema(ByteBuffer schema, SyncDataStatus status) {
+  public String syncSchema(String md5, ByteBuffer schema, SyncDataStatus 
status) {
+    String md5OfReceiver = "";
     if (status == SyncDataStatus.SUCCESS_STATUS) {
       /** sync metadata, include storage group and timeseries **/
-      loadMetadata();
-    } else {
+      return Boolean.toString(loadMetadata());
+    } else if (status == SyncDataStatus.PROCESSING_STATUS) {
       File file = new File(schemaFromSenderPath.get());
       if (!file.getParentFile().exists()) {
         try {
           file.getParentFile().mkdirs();
           if (!file.createNewFile()) {
             logger.error("Cannot create file {}", file.getPath());
+            return null;
           }
         } catch (IOException e) {
           logger.error("Cannot make schema file {}.", file.getPath(), e);
+          return null;
         }
       }
       try (FileOutputStream fos = new FileOutputStream(file, true);
@@ -220,15 +227,31 @@ public class ServerServiceImpl implements 
SyncService.Iface {
         channel.write(schema);
       } catch (Exception e) {
         logger.error("Cannot write data to file {}.", file.getPath(), e);
+        return null;
+      }
+    } else {
+      try (FileInputStream fis = new 
FileInputStream(schemaFromSenderPath.get())) {
+        MessageDigest md = MessageDigest.getInstance("MD5");
+        byte[] buffer = new byte[Constans.DATA_CHUNK_SIZE];
+        int n;
+        while ((n = fis.read(buffer)) != -1) {
+          md.update(buffer, 0, n);
+        }
+        md5OfReceiver = (new BigInteger(1, md.digest())).toString(16);
+        if (!md5.equals(md5OfReceiver)) {
+          FileUtils.forceDelete(new File(schemaFromSenderPath.get()));
+        }
+      } catch (Exception e) {
+        logger.error("Receiver cannot generate md5 {}", 
schemaFromSenderPath.get(), e);
       }
     }
-
+    return md5OfReceiver;
   }
 
   /**
    * Load metadata from sender
    */
-  private void loadMetadata() {
+  private boolean loadMetadata() {
     if (new File(schemaFromSenderPath.get()).exists()) {
       try (BufferedReader br = new BufferedReader(
           new java.io.FileReader(schemaFromSenderPath.get()))) {
@@ -239,12 +262,15 @@ public class ServerServiceImpl implements 
SyncService.Iface {
       } catch (FileNotFoundException e) {
         logger.error("Cannot read the file {}.",
             schemaFromSenderPath.get(), e);
+        return false;
       } catch (IOException e) {
         //TODO: how to deal with multiple insert schema
       } catch (Exception e) {
         logger.error("Parse metadata operation failed.", e);
+        return false;
       }
     }
+    return true;
   }
 
   /**
@@ -257,7 +283,7 @@ public class ServerServiceImpl implements SyncService.Iface 
{
     String[] args = cmd.trim().split(",");
     switch (args[0]) {
       case MetadataOperationType.ADD_PATH_TO_MTREE:
-        Map<String, String> props = null;
+        Map<String, String> props;
         String[] kv;
         props = new HashMap<>(args.length - 5 + 1, 1);
         for (int k = 5; k < args.length; k++) {
@@ -305,30 +331,17 @@ public class ServerServiceImpl implements 
SyncService.Iface {
   public String syncData(String md5OfSender, List<String> filePathSplit,
       ByteBuffer dataToReceive, SyncDataStatus status) {
     String md5OfReceiver = "";
-    StringBuilder filePathBuilder = new StringBuilder();
     FileChannel channel;
     /** Recombination File Path **/
-    for (int i = 0; i < filePathSplit.size(); i++) {
-      if (i == filePathSplit.size() - 1) {
-        filePathBuilder.append(filePathSplit.get(i));
-      } else {
-        filePathBuilder.append(filePathSplit.get(i)).append(File.separator);
-      }
-    }
-    String filePath = filePathBuilder.toString();
-    if (syncDataPath.length() > 0
-        && syncDataPath.charAt(syncDataPath.length() - 1) != 
File.separatorChar) {
-      syncDataPath = syncDataPath + File.separatorChar;
-    }
+    String filePath = StringUtils.join(filePathSplit, File.separatorChar);
+    syncDataPath = FilePathUtils.regularizePath(syncDataPath);
     filePath = syncDataPath + filePath;
     if (status == SyncDataStatus.PROCESSING_STATUS) { // there are still data 
stream to add
       File file = new File(filePath);
       if (!file.getParentFile().exists()) {
         try {
           file.getParentFile().mkdirs();
-          if (!file.createNewFile()) {
-            logger.error("cannot create file {}", file.getPath());
-          }
+          file.createNewFile();
         } catch (IOException e) {
           logger.error("cannot make file {}", file.getPath(), e);
         }
@@ -355,12 +368,10 @@ public class ServerServiceImpl implements 
SyncService.Iface {
             logger.info(String.format("Receiver has received %d files from 
sender", fileNum.get()));
           }
         } else {
-          if (!new File(filePath).delete()) {
-            logger.error("Receiver can not delete file {}", new 
File(filePath).getPath());
-          }
+          FileUtils.forceDelete(new File(filePath));
         }
       } catch (Exception e) {
-        logger.error("Receiver cannot generate md5", e);
+        logger.error("Receiver cannot generate md5 {}", filePath, e);
       }
     }
     return md5OfReceiver;
@@ -369,27 +380,20 @@ public class ServerServiceImpl implements 
SyncService.Iface {
 
   @Override
   public boolean load() {
-    getFileNodeInfo();
-    loadData();
-    SyncUtils.deleteFile(new File(syncDataPath));
-    for (String bufferWritePath : bufferWritePaths) {
-      if (bufferWritePath.length() > 0
-          && bufferWritePath.charAt(bufferWritePath.length() - 1) != 
File.separatorChar) {
-        bufferWritePath = bufferWritePath + File.separatorChar;
-      }
-      String backupPath = bufferWritePath + SYNC_SERVER + File.separator;
-      File backupDirectory = new File(backupPath, this.uuid.get());
-      if (backupDirectory.exists() && 
Objects.requireNonNull(backupDirectory.list()).length != 0) {
-        SyncUtils.deleteFile(backupDirectory);
-      }
+    try {
+      getFileNodeInfo();
+      loadData();
+    } catch (Exception e) {
+      logger.error("fail to load data", e);
+      return false;
     }
     return true;
   }
 
   /**
-   * Get all tsfiles' info which are sent from sender, it is prepare for 
merging these data
+   * Get all tsfiles' info which are sent from sender, it is preparing for 
merging these data
    */
-  public void getFileNodeInfo() {
+  public void getFileNodeInfo() throws IOException {
     File dataFileRoot = new File(syncDataPath);
     File[] files = dataFileRoot.listFiles();
     int processedNum = 0;
@@ -410,15 +414,17 @@ public class ServerServiceImpl implements 
SyncService.Iface {
             startTimeMap.put(key, device.getStartTime());
             endTimeMap.put(key, device.getEndTime());
           }
-        } catch (Exception e) {
-          logger.error("Unable to read tsfile {}", fileTF.getPath(), e);
+        } catch (IOException e) {
+          logger.error("Unable to read tsfile {}", fileTF.getPath());
+          throw new IOException(e);
         } finally {
           try {
             if (reader != null) {
               reader.close();
             }
           } catch (IOException e) {
-            logger.error("Cannot close tsfile stream {}", fileTF.getPath(), e);
+            logger.error("Cannot close tsfile stream {}", fileTF.getPath());
+            throw new IOException(e);
           }
         }
         fileNodeStartTime.get().put(fileTF.getPath(), startTimeMap);
@@ -440,11 +446,8 @@ public class ServerServiceImpl implements 
SyncService.Iface {
    * directly. If data in the tsfile is old, it has two strategy to merge.It 
depends on the
    * possibility of updating historical data.
    */
-  public void loadData() {
-    if (syncDataPath.length() > 0
-        && syncDataPath.charAt(syncDataPath.length() - 1) != 
File.separatorChar) {
-      syncDataPath = syncDataPath + File.separatorChar;
-    }
+  public void loadData() throws FileNodeManagerException {
+    syncDataPath = FilePathUtils.regularizePath(syncDataPath);
     int processedNum = 0;
     for (String storageGroup : fileNodeMap.get().keySet()) {
       List<String> filesPath = fileNodeMap.get().get(storageGroup);
@@ -500,7 +503,8 @@ public class ServerServiceImpl implements SyncService.Iface 
{
             }
           }
         } catch (FileNodeManagerException e) {
-          logger.error("Can not load external file ", e);
+          logger.error("Can not load external file {}", path);
+          throw new FileNodeManagerException(e);
         }
 
         processedNum++;
@@ -725,7 +729,11 @@ public class ServerServiceImpl implements 
SyncService.Iface {
     fileNodeStartTime.remove();
     fileNodeEndTime.remove();
     schemaFromSenderPath.remove();
-    SyncUtils.deleteFile(new File(syncFolderPath));
+    try {
+      FileUtils.deleteDirectory(new File(syncFolderPath));
+    } catch (IOException e) {
+      logger.error("can not delete directory {}", syncFolderPath, e);
+    }
     logger.info("Synchronization has finished!");
   }
 
diff --git 
a/iotdb/src/main/java/org/apache/iotdb/db/sync/sender/FileSenderImpl.java 
b/iotdb/src/main/java/org/apache/iotdb/db/sync/sender/FileSenderImpl.java
index 5090fb1..bcd1b20 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/sync/sender/FileSenderImpl.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/sync/sender/FileSenderImpl.java
@@ -42,6 +42,7 @@ import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
 import java.util.UUID;
+import org.apache.commons.io.FileUtils;
 import org.apache.iotdb.db.concurrent.ThreadName;
 import org.apache.iotdb.db.exception.SyncConnectionException;
 import org.apache.iotdb.db.sync.conf.Constans;
@@ -178,7 +179,7 @@ public class FileSenderImpl implements FileSender {
     for (String snapshotPath : config.getSnapshotPaths()) {
       if (new File(snapshotPath).exists() && new 
File(snapshotPath).list().length != 0) {
         /** It means that the last task of sync does not succeed! Clear the 
files and start to sync again **/
-        SyncUtils.deleteFile(new File(snapshotPath));
+        FileUtils.deleteDirectory(new File(snapshotPath));
       }
     }
 
@@ -195,7 +196,7 @@ public class FileSenderImpl implements FileSender {
     establishConnection(config.getServerIp(), config.getServerPort());
     if (!confirmIdentity(config.getUuidPath())) {
       LOGGER.error("Sorry, you do not have the permission to connect to sync 
receiver.");
-      return;
+      System.exit(1);
     }
 
     // 4. Create snapshot
@@ -205,15 +206,21 @@ public class FileSenderImpl implements FileSender {
 
     syncStatus = true;
 
-    // 5. Sync schema
-    syncSchema();
+    try{
+      // 5. Sync schema
+      syncSchema();
 
-    // 6. Sync data
-    syncAllData();
+      // 6. Sync data
+      syncAllData();
+    }catch (SyncConnectionException e){
+      LOGGER.error("cannot finish sync process", e);
+      syncStatus = false;
+      return;
+    }
 
     // 7. clear snapshot
     for (String snapshotPath : config.getSnapshotPaths()) {
-      SyncUtils.deleteFile(new File(snapshotPath));
+      FileUtils.deleteDirectory(new File(snapshotPath));
     }
 
     // 8. notify receiver that synchronization finish
@@ -238,7 +245,9 @@ public class FileSenderImpl implements FileSender {
       }
       LOGGER.info("Sync process starts to transfer data of storage group {}", 
entry.getKey());
       try {
-        serviceClient.init(entry.getKey());
+        if(!serviceClient.init(entry.getKey())){
+          throw new SyncConnectionException("unable init receiver");
+        }
       } catch (TException e) {
         throw new SyncConnectionException("Unable to connect to receiver", e);
       }
@@ -249,8 +258,7 @@ public class FileSenderImpl implements FileSender {
         fileManager.backupNowLocalFileInfo(config.getLastFileInfo());
         LOGGER.info("Sync process has finished storage group {}.", 
entry.getKey());
       } else {
-        throw new SyncConnectionException(
-            "Receiver cannot sync data, abandon this synchronization");
+        LOGGER.error("Receiver cannot sync data, abandon this synchronization 
of storage group {}", entry.getKey());
       }
     }
   }
@@ -365,8 +373,14 @@ public class FileSenderImpl implements FileSender {
           filePathSplit.add(name[name.length - 2]);
           filePathSplit.add(name[name.length - 1]);
         }
+        int retryCount = 0;
         while (true) {
           // Sync all data to receiver
+          if(retryCount > Constans.MAX_SYNC_FILE_TRY){
+            throw new SyncConnectionException(String
+                .format("can not sync file %s after %s tries.", 
snapshotFilePath,
+                    Constans.MAX_SYNC_FILE_TRY));
+          }
           byte[] buffer = new byte[Constans.DATA_CHUNK_SIZE];
           int data;
           try (FileInputStream fis = new FileInputStream(file)) {
@@ -392,11 +406,12 @@ public class FileSenderImpl implements FileSender {
           // the file is sent successfully
           String md5OfSender = (new BigInteger(1, md.digest())).toString(16);
           String md5OfReceiver = serviceClient.syncData(md5OfSender, 
filePathSplit,
-              null, SyncDataStatus.SUCCESS_STATUS);
+              null, SyncDataStatus.FINISH_STATUS);
           if (md5OfSender.equals(md5OfReceiver)) {
             LOGGER.info("Receiver has received {} successfully.", 
snapshotFilePath);
             break;
           }
+          retryCount++;
         }
         if (LOGGER.isInfoEnabled()) {
           LOGGER.info(String.format("Task of synchronization has completed 
%d/%d.", successNum,
@@ -413,24 +428,48 @@ public class FileSenderImpl implements FileSender {
    */
   @Override
   public void syncSchema() throws SyncConnectionException {
-    try (FileInputStream fis = new FileInputStream(new 
File(config.getSchemaPath()))) {
-      int mBufferSize = 4 * 1024 * 1024;
-      ByteArrayOutputStream bos = new ByteArrayOutputStream(mBufferSize);
-      byte[] buffer = new byte[mBufferSize];
-      int n;
-      while ((n = fis.read(buffer)) != -1) { // cut the file into pieces to 
send
-        bos.write(buffer, 0, n);
-        ByteBuffer buffToSend = ByteBuffer.wrap(bos.toByteArray());
-        bos.reset();
-        // 1 represents there is still schema buffer to send.
-        serviceClient.syncSchema(buffToSend, SyncDataStatus.PROCESSING_STATUS);
+    int retryCount = 0;
+    while (true) {
+      if (retryCount > Constans.MAX_SYNC_FILE_TRY) {
+        throw new SyncConnectionException(String
+            .format("can not sync schema after %s tries.", 
Constans.MAX_SYNC_FILE_TRY));
+      }
+      try (FileInputStream fis = new FileInputStream(new 
File(config.getSchemaPath()))) {
+        int mBufferSize = 4 * 1024 * 1024;
+        ByteArrayOutputStream bos = new ByteArrayOutputStream(mBufferSize);
+        byte[] buffer = new byte[mBufferSize];
+        int n;
+        while ((n = fis.read(buffer)) != -1) { // cut the file into pieces to 
send
+          bos.write(buffer, 0, n);
+          ByteBuffer buffToSend = ByteBuffer.wrap(bos.toByteArray());
+          bos.reset();
+          // PROCESSING_STATUS represents there is still schema buffer to send.
+          serviceClient.syncSchema(null, buffToSend, 
SyncDataStatus.PROCESSING_STATUS);
+        }
+        bos.close();
+        // Get md5 of the file.
+        fis.reset();
+        MessageDigest md = MessageDigest.getInstance("MD5");
+        int data;
+        while ((data = fis.read(buffer)) != -1) {
+          md.update(buffer, 0, data);
+        }
+        String md5OfSender = (new BigInteger(1, md.digest())).toString(16);
+        String md5OfReceiver = serviceClient
+            .syncSchema(md5OfSender, null, SyncDataStatus.FINISH_STATUS);
+        if (md5OfSender.equals(md5OfReceiver)) {
+          LOGGER.info("Receiver has received schema successfully.");
+          /** receiver start to load metadata **/
+          if(Boolean.parseBoolean(serviceClient.syncSchema(null, null, 
SyncDataStatus.SUCCESS_STATUS))){
+            throw new SyncConnectionException("receiver failed to load 
metadata");
+          }
+          break;
+        }
+        retryCount++;
+      }catch (Exception e) {
+        LOGGER.error("Cannot sync schema ", e);
+        throw new SyncConnectionException(e);
       }
-      bos.close();
-      // 0 represents the schema file has been transferred completely.
-      serviceClient.syncSchema(null, SyncDataStatus.SUCCESS_STATUS);
-    } catch (Exception e) {
-      LOGGER.error("Cannot sync schema ", e);
-      throw new SyncConnectionException(e);
     }
   }
 
@@ -479,7 +518,7 @@ public class FileSenderImpl implements FileSender {
           try {
             fileLock.release();
             randomAccessFile.close();
-            file.delete();
+            FileUtils.forceDelete(file);
           } catch (Exception e) {
             LOGGER.error("Unable to remove lock file: {}", lockFile, e);
           }
diff --git a/service-rpc/src/main/thrift/sync.thrift 
b/iotdb/src/main/java/org/apache/iotdb/db/utils/FilePathUtils.java
old mode 100755
new mode 100644
similarity index 62%
copy from service-rpc/src/main/thrift/sync.thrift
copy to iotdb/src/main/java/org/apache/iotdb/db/utils/FilePathUtils.java
index e139783..a797b5c
--- a/service-rpc/src/main/thrift/sync.thrift
+++ b/iotdb/src/main/java/org/apache/iotdb/db/utils/FilePathUtils.java
@@ -1,37 +1,38 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-namespace java org.apache.iotdb.service.sync.thrift
-
-typedef i32 int 
-typedef i16 short
-typedef i64 long
-
-enum SyncDataStatus {
-  SUCCESS_STATUS,
-  PROCESSING_STATUS
-}
-
-service SyncService{
-       bool checkIdentity(1:string uuid, 2:string address)
-       void syncSchema(1:binary buff, 2:SyncDataStatus status)
-       string syncData(1:string md5, 2:list<string> filename, 3:binary buff, 
4:SyncDataStatus status)
-       bool load()
-       void cleanUp()
-       void init(1:string storageGroup)
-}
\ No newline at end of file
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.utils;
+
+import java.io.File;
+
+public class FilePathUtils {
+
+  /**
+   * Format file path to end with File.separator
+   * @param filePath origin file path
+   * @return Regularized Path
+   */
+  public static String regularizePath(String filePath){
+    if (filePath.length() > 0
+        && filePath.charAt(filePath.length() - 1) != File.separatorChar) {
+      filePath = filePath + File.separatorChar;
+    }
+    return filePath;
+  }
+
+}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/utils/SyncUtils.java 
b/iotdb/src/main/java/org/apache/iotdb/db/utils/SyncUtils.java
index 904b22c..d395038 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/utils/SyncUtils.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/utils/SyncUtils.java
@@ -22,7 +22,6 @@ import java.io.File;
 import java.text.DecimalFormat;
 import java.util.Map;
 import java.util.Map.Entry;
-import java.util.Objects;
 import java.util.Set;
 import org.apache.iotdb.db.sync.conf.SyncSenderDescriptor;
 
@@ -124,26 +123,4 @@ public class SyncUtils {
     ipAddressBinary = ipAddressBinary.substring(0, subnetMark);
     return ipAddressBinary.equals(ipSegmentBinary);
   }
-
-  /**
-   * Remove all files under this folder recursively
-   *
-   * @param file folder file
-   */
-  public static void deleteFile(File file) {
-    if (!file.exists()) {
-      return;
-    }
-    if (file.isFile() || Objects.requireNonNull(file.list()).length == 0) {
-      file.delete();
-    } else {
-      File[] files = file.listFiles();
-      assert files != null;
-      for (File f : files) {
-        deleteFile(f);
-        f.delete();
-      }
-      file.delete();
-    }
-  }
 }
diff --git a/service-rpc/src/main/thrift/sync.thrift 
b/service-rpc/src/main/thrift/sync.thrift
index e139783..56b2774 100755
--- a/service-rpc/src/main/thrift/sync.thrift
+++ b/service-rpc/src/main/thrift/sync.thrift
@@ -24,14 +24,15 @@ typedef i64 long
 
 enum SyncDataStatus {
   SUCCESS_STATUS,
+  FINISH_STATUS,
   PROCESSING_STATUS
 }
 
 service SyncService{
        bool checkIdentity(1:string uuid, 2:string address)
-       void syncSchema(1:binary buff, 2:SyncDataStatus status)
+       string syncSchema(1:string md5, 2:binary buff, 3:SyncDataStatus status)
        string syncData(1:string md5, 2:list<string> filename, 3:binary buff, 
4:SyncDataStatus status)
        bool load()
        void cleanUp()
-       void init(1:string storageGroup)
+       bool init(1:string storageGroup)
 }
\ No newline at end of file

Reply via email to