This is an automated email from the ASF dual-hosted git repository.

lta pushed a commit to branch organize_properties
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git

commit 39e82b2df5cf6829ff318af3121eefc6c92f6c76
Author: lta <[email protected]>
AuthorDate: Mon Jun 3 15:04:45 2019 +0800

    organize properties
---
 iotdb/iotdb/conf/iotdb-engine.properties           | 44 ++++++++++----
 iotdb/iotdb/conf/iotdb-sync-client.properties      |  4 +-
 .../iotdb/db/sync/conf/SyncSenderConfig.java       | 24 +++++---
 .../iotdb/db/sync/conf/SyncSenderDescriptor.java   |  7 +--
 .../apache/iotdb/db/sync/sender/SyncSender.java    |  5 ++
 .../iotdb/db/sync/sender/SyncSenderImpl.java       | 68 +++++++++++++---------
 6 files changed, 101 insertions(+), 51 deletions(-)

diff --git a/iotdb/iotdb/conf/iotdb-engine.properties 
b/iotdb/iotdb/conf/iotdb-engine.properties
index c91cec7..c52f5ea 100644
--- a/iotdb/iotdb/conf/iotdb-engine.properties
+++ b/iotdb/iotdb/conf/iotdb-engine.properties
@@ -17,15 +17,22 @@
 # under the License.
 #
 
+####################
+### RPC Configuration
+####################
+
 rpc_address=0.0.0.0
 
 rpc_port=6667
 
-# Write ahead log configuration
+####################
+### Write Ahead Log Configuration
+####################
+
 # Is write ahead log enable
 enable_wal=true
 
-# When a certain amount ofwrite ahead log is reached, it will be flushed to 
disk
+# When a certain amount of write ahead log is reached, it will be flushed to 
disk
 # It is possible to lose at most flush_wal_threshold operations
 flush_wal_threshold=10000
 
@@ -38,7 +45,10 @@ flush_wal_period_in_ms=10
 # Set this parameter to 0 may slow down the ingestion on slow disk.
 force_wal_period_in_ms=10
 
-# database features configuration
+####################
+### Directory Configuration
+####################
+
 # data dir
 # If this property is unset, system will save the data in the default relative 
path directory under the IoTDB folder(i.e., %IOTDB_HOME%/data/data).
 # If it is absolute, system will save the data in exact location it points to.
@@ -100,6 +110,10 @@ force_wal_period_in_ms=10
 # If its prefix is "/", then the path is absolute. Otherwise, it is relative.
 # wal_dir=/path/iotdb/data
 
+####################
+### Memory Control Configuration
+####################
+
 # The maximum concurrent thread number for merging overflow
 # Increase this value, it will increase IO and CPU consumption
 # Decrease this value, when there is much overflow data, it will increase disk 
usage, which will reduce read speed
@@ -116,7 +130,7 @@ max_opened_folder=100
 # The amount of data that is read every time when IoTDB merge data.
 fetch_size=10000
 
-# The period time of flushing data from memory to file. 
+# The period time of flushing data from memory to file.
 # The unit is second.
 period_time_for_flush_in_second=3600
 
@@ -164,7 +178,10 @@ overflow_file_size_threshold=209715200
 # How many thread can concurrently flush. When <= 0, use CPU core number.
 concurrent_flush_thread=0
 
-# Statistics Monitor configuration
+####################
+### Statistics Monitor configuration
+####################
+
 # Set enable_stat_monitor true(or false) to enable(or disable) the StatMonitor 
that stores statistics info periodically.
 # back_loop_period_sec decides the period when StatMonitor writes statistics 
info into IoTDB.
 # stat_monitor_detect_freq_sec decides when IoTDB detects statistics info 
out-of-date.
@@ -202,11 +219,14 @@ schema_manager_cache_size=300000
 # Generally the default value 4MB is enough.
 max_log_entry_size=4194304
 
-# IoTDB sync server properties
-# Whether to allow to post back, the default allowed
+####################
+### Sync Server Configuration
+####################
+
+# Whether to open the sync_server_port for receiving data from sync client, 
the default allowed
 is_sync_enable=true
 
-# Sync server port address
+# Sync server port to listen
 sync_server_port=5555
 
 # White IP list of Sync client.
@@ -215,7 +235,9 @@ sync_server_port=5555
 # The default is to allow all IP to sync
 IP_white_list=0.0.0.0/0
 
-# Choose a sync strategy of loading historical data:
-#1. It's more likely to update historical data, please choose "true".
-#2. It's more likely not to update historical data or you don't know exactly, 
please choose "false". 
+# The processing strategy chosen by the sync server when merging the sync data.
+# 1. If the sync data accounts for more than 50% of the update of the 
historical data (compared with the latest timestamp of the local storage group 
data),then it is recommended to select strategy 1.
+#    Setting the parameter to true, which has a greater impact on the write 
performance of the IoTDB system and occupies less CPU of the machine.
+# 2. If the sync data accounts for less than 50% of the update of the 
historical data (compared with the latest timestamp of the local storage group 
data),then it is recommended to select strategy 2.
+#    Setting the parameter to false, which has little impact on the write 
performance of IoTDB system and takes up a large amount of CPU power.
 update_historical_data_possibility=false
diff --git a/iotdb/iotdb/conf/iotdb-sync-client.properties 
b/iotdb/iotdb/conf/iotdb-sync-client.properties
index 479d1c0..72e31c5 100644
--- a/iotdb/iotdb/conf/iotdb-sync-client.properties
+++ b/iotdb/iotdb/conf/iotdb-sync-client.properties
@@ -23,8 +23,8 @@ server_ip=127.0.0.1
 # Sync client port
 server_port=5555
 
-# The cycle time of post data back to receiver, the unit of time is second
-upload_cycle_in_seconds=600
+# The period time of sync process, the unit of time is second
+sync_period_in_seconds=600
 
 # Set bufferWrite data absolute path of IoTDB
 # It needs to be set with iotdb_schema_directory, they have to belong to the 
same IoTDB
diff --git 
a/iotdb/src/main/java/org/apache/iotdb/db/sync/conf/SyncSenderConfig.java 
b/iotdb/src/main/java/org/apache/iotdb/db/sync/conf/SyncSenderConfig.java
index 50bd443..8c3ef1c 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/sync/conf/SyncSenderConfig.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/sync/conf/SyncSenderConfig.java
@@ -22,23 +22,33 @@ import java.io.File;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.metadata.MetadataConstant;
 import org.apache.iotdb.db.utils.FilePathUtils;
-import org.apache.iotdb.db.utils.FileUtils;
-import org.apache.iotdb.db.utils.SyncUtils;
 
 public class SyncSenderConfig {
 
   private String[] bufferwriteDirectory = 
IoTDBDescriptor.getInstance().getConfig()
       .getBufferWriteDirs();
+
   private String dataDirectory = 
IoTDBDescriptor.getInstance().getConfig().getDataDir();
+
   private String lockFilePath;
+
   private String uuidPath;
+
   private String lastFileInfo;
+
   private String[] snapshotPaths;
+
   private String schemaPath;
+
   private String serverIp = "127.0.0.1";
+
   private int serverPort = 5555;
-  private int uploadCycleInSeconds = 10;
 
+  private int syncPeriodInSeconds = 10;
+
+  /**
+   * Init path
+   */
   public void init() {
     String metadataDirPath = 
IoTDBDescriptor.getInstance().getConfig().getMetadataDir();
     metadataDirPath = FilePathUtils.regularizePath(metadataDirPath);
@@ -127,12 +137,12 @@ public class SyncSenderConfig {
     this.serverPort = serverPort;
   }
 
-  public int getUploadCycleInSeconds() {
-    return uploadCycleInSeconds;
+  public int getSyncPeriodInSeconds() {
+    return syncPeriodInSeconds;
   }
 
-  public void setUploadCycleInSeconds(int uploadCycleInSeconds) {
-    this.uploadCycleInSeconds = uploadCycleInSeconds;
+  public void setSyncPeriodInSeconds(int syncPeriodInSeconds) {
+    this.syncPeriodInSeconds = syncPeriodInSeconds;
   }
 
   public String getLockFilePath() {
diff --git 
a/iotdb/src/main/java/org/apache/iotdb/db/sync/conf/SyncSenderDescriptor.java 
b/iotdb/src/main/java/org/apache/iotdb/db/sync/conf/SyncSenderDescriptor.java
index 99dcb56..b69434a 100644
--- 
a/iotdb/src/main/java/org/apache/iotdb/db/sync/conf/SyncSenderDescriptor.java
+++ 
b/iotdb/src/main/java/org/apache/iotdb/db/sync/conf/SyncSenderDescriptor.java
@@ -26,7 +26,6 @@ import java.io.InputStream;
 import java.util.Properties;
 import org.apache.iotdb.db.conf.IoTDBConstant;
 import org.apache.iotdb.db.utils.FilePathUtils;
-import org.apache.iotdb.db.utils.SyncUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -88,9 +87,9 @@ public class SyncSenderDescriptor {
       conf.setServerIp(properties.getProperty("server_ip", 
conf.getServerIp()));
       conf.setServerPort(Integer
           .parseInt(properties.getProperty("server_port", 
Integer.toString(conf.getServerPort()))));
-      conf.setUploadCycleInSeconds(Integer.parseInt(properties
-          .getProperty("upload_cycle_in_seconds",
-              Integer.toString(conf.getUploadCycleInSeconds()))));
+      conf.setSyncPeriodInSeconds(Integer.parseInt(properties
+          .getProperty("sync_period_in_seconds",
+              Integer.toString(conf.getSyncPeriodInSeconds()))));
       conf.setSchemaPath(properties.getProperty("iotdb_schema_directory", 
conf.getSchemaPath()));
       conf.setDataDirectory(
           properties.getProperty("iotdb_bufferWrite_directory", 
conf.getDataDirectory()));
diff --git 
a/iotdb/src/main/java/org/apache/iotdb/db/sync/sender/SyncSender.java 
b/iotdb/src/main/java/org/apache/iotdb/db/sync/sender/SyncSender.java
index 4ef1a3c..a0cfc3f 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/sync/sender/SyncSender.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/sync/sender/SyncSender.java
@@ -62,4 +62,9 @@ public interface SyncSender {
    */
   void sync() throws SyncConnectionException, IOException;
 
+  /**
+   * Stop sync process
+   */
+  void stop();
+
 }
diff --git 
a/iotdb/src/main/java/org/apache/iotdb/db/sync/sender/SyncSenderImpl.java 
b/iotdb/src/main/java/org/apache/iotdb/db/sync/sender/SyncSenderImpl.java
index a20ff7d..5a02223 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/sync/sender/SyncSenderImpl.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/sync/sender/SyncSenderImpl.java
@@ -42,7 +42,10 @@ import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
 import java.util.UUID;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
 import org.apache.commons.io.FileUtils;
+import org.apache.iotdb.db.concurrent.IoTDBThreadPoolFactory;
 import org.apache.iotdb.db.concurrent.ThreadName;
 import org.apache.iotdb.db.exception.SyncConnectionException;
 import org.apache.iotdb.db.sync.conf.Constans;
@@ -66,10 +69,19 @@ import org.slf4j.LoggerFactory;
 public class SyncSenderImpl implements SyncSender {
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(SyncSenderImpl.class);
+
   private TTransport transport;
+
   private SyncService.Client serviceClient;
+
   private List<String> schema = new ArrayList<>();
 
+  private static SyncSenderConfig config = 
SyncSenderDescriptor.getInstance().getConfig();
+
+  private static final long SYNC_DELAY = 0;
+
+  private static final long SYNC_PERIOD = config.getSyncPeriodInSeconds();
+
   /**
    * Files that need to be synchronized
    */
@@ -96,7 +108,8 @@ public class SyncSenderImpl implements SyncSender {
   private Map<String, Set<String>> validFileSnapshot = new HashMap<>();
 
   private SyncFileManager syncFileManager = SyncFileManager.getInstance();
-  private SyncSenderConfig config = 
SyncSenderDescriptor.getInstance().getConfig();
+
+  private ScheduledExecutorService executorService;
 
   /**
    * Monitor sync status.
@@ -109,7 +122,7 @@ public class SyncSenderImpl implements SyncSender {
         continue;
       }
       if ((currentTime.getTime() - lastSyncTime.getTime())
-          % (config.getUploadCycleInSeconds() * 1000) == 0) {
+          % (config.getSyncPeriodInSeconds() * 1000) == 0) {
         oldTime = currentTime;
         if (syncStatus) {
           LOGGER.info("Sync process is in execution!");
@@ -130,8 +143,7 @@ public class SyncSenderImpl implements SyncSender {
    *
    * @param args not used
    */
-  public static void main(String[] args)
-      throws InterruptedException, IOException, SyncConnectionException {
+  public static void main(String[] args) throws IOException {
     Thread.currentThread().setName(ThreadName.SYNC_CLIENT.getName());
     SyncSenderImpl fileSenderImpl = new SyncSenderImpl();
     fileSenderImpl.verifySingleton();
@@ -151,22 +163,22 @@ public class SyncSenderImpl implements SyncSender {
   /**
    * Start sync task in a certain time.
    */
-  public void timedTask() throws InterruptedException, 
SyncConnectionException, IOException {
-    sync();
-    lastSyncTime = new Date();
-    Date currentTime;
-    while (true) {
-      if (Thread.interrupted()) {
-        break;
-      }
-      Thread.sleep(2000);
-      currentTime = new Date();
-      if (currentTime.getTime() - lastSyncTime.getTime()
-          > config.getUploadCycleInSeconds() * 1000) {
-        lastSyncTime = currentTime;
+  public void timedTask() {
+    executorService = IoTDBThreadPoolFactory.newScheduledThreadPool(1,
+        "sync-client");
+    executorService.scheduleWithFixedDelay(() -> {
+      try {
         sync();
+      } catch (SyncConnectionException | IOException e) {
+        LOGGER.error("Sync failed", e);
+        stop();
       }
-    }
+    }, SYNC_DELAY, SYNC_PERIOD, TimeUnit.SECONDS);
+  }
+
+  @Override
+  public void stop() {
+    executorService.shutdownNow();
   }
 
   /**
@@ -178,7 +190,7 @@ public class SyncSenderImpl implements SyncSender {
     //1. Clear old snapshots if necessary
     for (String snapshotPath : config.getSnapshotPaths()) {
       if (new File(snapshotPath).exists() && new 
File(snapshotPath).list().length != 0) {
-        /** It means that the last task of sync does not succeed! Clear the 
files and start to sync again **/
+        // It means that the last task of sync does not succeed! Clear the 
files and start to sync again
         FileUtils.deleteDirectory(new File(snapshotPath));
       }
     }
@@ -206,13 +218,13 @@ public class SyncSenderImpl implements SyncSender {
 
     syncStatus = true;
 
-    try{
+    try {
       // 5. Sync schema
       syncSchema();
 
       // 6. Sync data
       syncAllData();
-    }catch (SyncConnectionException e){
+    } catch (SyncConnectionException e) {
       LOGGER.error("cannot finish sync process", e);
       syncStatus = false;
       return;
@@ -245,7 +257,7 @@ public class SyncSenderImpl implements SyncSender {
       }
       LOGGER.info("Sync process starts to transfer data of storage group {}", 
entry.getKey());
       try {
-        if(!serviceClient.init(entry.getKey())){
+        if (!serviceClient.init(entry.getKey())) {
           throw new SyncConnectionException("unable init receiver");
         }
       } catch (TException e) {
@@ -258,7 +270,8 @@ public class SyncSenderImpl implements SyncSender {
         syncFileManager.backupNowLocalFileInfo(config.getLastFileInfo());
         LOGGER.info("Sync process has finished storage group {}.", 
entry.getKey());
       } else {
-        LOGGER.error("Receiver cannot sync data, abandon this synchronization 
of storage group {}", entry.getKey());
+        LOGGER.error("Receiver cannot sync data, abandon this synchronization 
of storage group {}",
+            entry.getKey());
       }
     }
   }
@@ -380,7 +393,7 @@ public class SyncSenderImpl implements SyncSender {
         while (true) {
           retryCount++;
           // Sync all data to receiver
-          if(retryCount > Constans.MAX_SYNC_FILE_TRY){
+          if (retryCount > Constans.MAX_SYNC_FILE_TRY) {
             throw new SyncConnectionException(String
                 .format("can not sync file %s after %s tries.", 
snapshotFilePath,
                     Constans.MAX_SYNC_FILE_TRY));
@@ -395,8 +408,8 @@ public class SyncSenderImpl implements SyncSender {
               md.update(buffer, 0, dataLength);
               ByteBuffer buffToSend = ByteBuffer.wrap(bos.toByteArray());
               bos.reset();
-              if(!Boolean.parseBoolean(serviceClient
-                  .syncData(null, filePathSplit, buffToSend, 
SyncDataStatus.PROCESSING_STATUS))){
+              if (!Boolean.parseBoolean(serviceClient
+                  .syncData(null, filePathSplit, buffToSend, 
SyncDataStatus.PROCESSING_STATUS))) {
                 LOGGER.info("Receiver failed to receive data from {}, retry.", 
snapshotFilePath);
                 continue outer;
               }
@@ -445,7 +458,8 @@ public class SyncSenderImpl implements SyncSender {
           ByteBuffer buffToSend = ByteBuffer.wrap(bos.toByteArray());
           bos.reset();
           // PROCESSING_STATUS represents there is still schema buffer to send.
-          if(!Boolean.parseBoolean(serviceClient.syncSchema(null, buffToSend, 
SyncDataStatus.PROCESSING_STATUS))){
+          if (!Boolean.parseBoolean(
+              serviceClient.syncSchema(null, buffToSend, 
SyncDataStatus.PROCESSING_STATUS))) {
             LOGGER.error("Receiver failed to receive metadata, retry.");
             continue outer;
           }

Reply via email to