This is an automated email from the ASF dual-hosted git repository. lta pushed a commit to branch organize_properties in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
commit 39e82b2df5cf6829ff318af3121eefc6c92f6c76 Author: lta <[email protected]> AuthorDate: Mon Jun 3 15:04:45 2019 +0800 organize properties --- iotdb/iotdb/conf/iotdb-engine.properties | 44 ++++++++++---- iotdb/iotdb/conf/iotdb-sync-client.properties | 4 +- .../iotdb/db/sync/conf/SyncSenderConfig.java | 24 +++++--- .../iotdb/db/sync/conf/SyncSenderDescriptor.java | 7 +-- .../apache/iotdb/db/sync/sender/SyncSender.java | 5 ++ .../iotdb/db/sync/sender/SyncSenderImpl.java | 68 +++++++++++++--------- 6 files changed, 101 insertions(+), 51 deletions(-) diff --git a/iotdb/iotdb/conf/iotdb-engine.properties b/iotdb/iotdb/conf/iotdb-engine.properties index c91cec7..c52f5ea 100644 --- a/iotdb/iotdb/conf/iotdb-engine.properties +++ b/iotdb/iotdb/conf/iotdb-engine.properties @@ -17,15 +17,22 @@ # under the License. # +#################### +### RPC Configuration +#################### + rpc_address=0.0.0.0 rpc_port=6667 -# Write ahead log configuration +#################### +### Write Ahead Log Configuration +#################### + # Is write ahead log enable enable_wal=true -# When a certain amount ofwrite ahead log is reached, it will be flushed to disk +# When a certain amount of write ahead log is reached, it will be flushed to disk # It is possible to lose at most flush_wal_threshold operations flush_wal_threshold=10000 @@ -38,7 +45,10 @@ flush_wal_period_in_ms=10 # Set this parameter to 0 may slow down the ingestion on slow disk. force_wal_period_in_ms=10 -# database features configuration +#################### +### Directory Configuration +#################### + # data dir # If this property is unset, system will save the data in the default relative path directory under the IoTDB folder(i.e., %IOTDB_HOME%/data/data). # If it is absolute, system will save the data in exact location it points to. @@ -100,6 +110,10 @@ force_wal_period_in_ms=10 # If its prefix is "/", then the path is absolute. Otherwise, it is relative. # wal_dir=/path/iotdb/data +#################### +### Memory Control Configuration +#################### + # The maximum concurrent thread number for merging overflow # Increase this value, it will increase IO and CPU consumption # Decrease this value, when there is much overflow data, it will increase disk usage, which will reduce read speed @@ -116,7 +130,7 @@ max_opened_folder=100 # The amount of data that is read every time when IoTDB merge data. fetch_size=10000 -# The period time of flushing data from memory to file. +# The period time of flushing data from memory to file. # The unit is second. period_time_for_flush_in_second=3600 @@ -164,7 +178,10 @@ overflow_file_size_threshold=209715200 # How many thread can concurrently flush. When <= 0, use CPU core number. concurrent_flush_thread=0 -# Statistics Monitor configuration +#################### +### Statistics Monitor configuration +#################### + # Set enable_stat_monitor true(or false) to enable(or disable) the StatMonitor that stores statistics info periodically. # back_loop_period_sec decides the period when StatMonitor writes statistics info into IoTDB. # stat_monitor_detect_freq_sec decides when IoTDB detects statistics info out-of-date. @@ -202,11 +219,14 @@ schema_manager_cache_size=300000 # Generally the default value 4MB is enough. max_log_entry_size=4194304 -# IoTDB sync server properties -# Whether to allow to post back, the default allowed +#################### +### Sync Server Configuration +#################### + +# Whether to open the sync_server_port for receiving data from sync client, the default allowed is_sync_enable=true -# Sync server port address +# Sync server port to listen sync_server_port=5555 # White IP list of Sync client. @@ -215,7 +235,9 @@ sync_server_port=5555 # The default is to allow all IP to sync IP_white_list=0.0.0.0/0 -# Choose a sync strategy of loading historical data: -#1. It's more likely to update historical data, please choose "true". -#2. It's more likely not to update historical data or you don't know exactly, please choose "false". +# The processing strategy chosen by the sync server when merging the sync data. +# 1. If the sync data accounts for more than 50% of the update of the historical data (compared with the latest timestamp of the local storage group data),then it is recommended to select strategy 1. +# Setting the parameter to true, which has a greater impact on the write performance of the IoTDB system and occupies less CPU of the machine. +# 2. If the sync data accounts for less than 50% of the update of the historical data (compared with the latest timestamp of the local storage group data),then it is recommended to select strategy 2. +# Setting the parameter to false, which has little impact on the write performance of IoTDB system and takes up a large amount of CPU power. update_historical_data_possibility=false diff --git a/iotdb/iotdb/conf/iotdb-sync-client.properties b/iotdb/iotdb/conf/iotdb-sync-client.properties index 479d1c0..72e31c5 100644 --- a/iotdb/iotdb/conf/iotdb-sync-client.properties +++ b/iotdb/iotdb/conf/iotdb-sync-client.properties @@ -23,8 +23,8 @@ server_ip=127.0.0.1 # Sync client port server_port=5555 -# The cycle time of post data back to receiver, the unit of time is second -upload_cycle_in_seconds=600 +# The period time of sync process, the unit of time is second +sync_period_in_seconds=600 # Set bufferWrite data absolute path of IoTDB # It needs to be set with iotdb_schema_directory, they have to belong to the same IoTDB diff --git a/iotdb/src/main/java/org/apache/iotdb/db/sync/conf/SyncSenderConfig.java b/iotdb/src/main/java/org/apache/iotdb/db/sync/conf/SyncSenderConfig.java index 50bd443..8c3ef1c 100644 --- a/iotdb/src/main/java/org/apache/iotdb/db/sync/conf/SyncSenderConfig.java +++ b/iotdb/src/main/java/org/apache/iotdb/db/sync/conf/SyncSenderConfig.java @@ -22,23 +22,33 @@ import java.io.File; import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.metadata.MetadataConstant; import org.apache.iotdb.db.utils.FilePathUtils; -import org.apache.iotdb.db.utils.FileUtils; -import org.apache.iotdb.db.utils.SyncUtils; public class SyncSenderConfig { private String[] bufferwriteDirectory = IoTDBDescriptor.getInstance().getConfig() .getBufferWriteDirs(); + private String dataDirectory = IoTDBDescriptor.getInstance().getConfig().getDataDir(); + private String lockFilePath; + private String uuidPath; + private String lastFileInfo; + private String[] snapshotPaths; + private String schemaPath; + private String serverIp = "127.0.0.1"; + private int serverPort = 5555; - private int uploadCycleInSeconds = 10; + private int syncPeriodInSeconds = 10; + + /** + * Init path + */ public void init() { String metadataDirPath = IoTDBDescriptor.getInstance().getConfig().getMetadataDir(); metadataDirPath = FilePathUtils.regularizePath(metadataDirPath); @@ -127,12 +137,12 @@ public class SyncSenderConfig { this.serverPort = serverPort; } - public int getUploadCycleInSeconds() { - return uploadCycleInSeconds; + public int getSyncPeriodInSeconds() { + return syncPeriodInSeconds; } - public void setUploadCycleInSeconds(int uploadCycleInSeconds) { - this.uploadCycleInSeconds = uploadCycleInSeconds; + public void setSyncPeriodInSeconds(int syncPeriodInSeconds) { + this.syncPeriodInSeconds = syncPeriodInSeconds; } public String getLockFilePath() { diff --git a/iotdb/src/main/java/org/apache/iotdb/db/sync/conf/SyncSenderDescriptor.java b/iotdb/src/main/java/org/apache/iotdb/db/sync/conf/SyncSenderDescriptor.java index 99dcb56..b69434a 100644 --- a/iotdb/src/main/java/org/apache/iotdb/db/sync/conf/SyncSenderDescriptor.java +++ b/iotdb/src/main/java/org/apache/iotdb/db/sync/conf/SyncSenderDescriptor.java @@ -26,7 +26,6 @@ import java.io.InputStream; import java.util.Properties; import org.apache.iotdb.db.conf.IoTDBConstant; import org.apache.iotdb.db.utils.FilePathUtils; -import org.apache.iotdb.db.utils.SyncUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -88,9 +87,9 @@ public class SyncSenderDescriptor { conf.setServerIp(properties.getProperty("server_ip", conf.getServerIp())); conf.setServerPort(Integer .parseInt(properties.getProperty("server_port", Integer.toString(conf.getServerPort())))); - conf.setUploadCycleInSeconds(Integer.parseInt(properties - .getProperty("upload_cycle_in_seconds", - Integer.toString(conf.getUploadCycleInSeconds())))); + conf.setSyncPeriodInSeconds(Integer.parseInt(properties + .getProperty("sync_period_in_seconds", + Integer.toString(conf.getSyncPeriodInSeconds())))); conf.setSchemaPath(properties.getProperty("iotdb_schema_directory", conf.getSchemaPath())); conf.setDataDirectory( properties.getProperty("iotdb_bufferWrite_directory", conf.getDataDirectory())); diff --git a/iotdb/src/main/java/org/apache/iotdb/db/sync/sender/SyncSender.java b/iotdb/src/main/java/org/apache/iotdb/db/sync/sender/SyncSender.java index 4ef1a3c..a0cfc3f 100644 --- a/iotdb/src/main/java/org/apache/iotdb/db/sync/sender/SyncSender.java +++ b/iotdb/src/main/java/org/apache/iotdb/db/sync/sender/SyncSender.java @@ -62,4 +62,9 @@ public interface SyncSender { */ void sync() throws SyncConnectionException, IOException; + /** + * Stop sync process + */ + void stop(); + } diff --git a/iotdb/src/main/java/org/apache/iotdb/db/sync/sender/SyncSenderImpl.java b/iotdb/src/main/java/org/apache/iotdb/db/sync/sender/SyncSenderImpl.java index a20ff7d..5a02223 100644 --- a/iotdb/src/main/java/org/apache/iotdb/db/sync/sender/SyncSenderImpl.java +++ b/iotdb/src/main/java/org/apache/iotdb/db/sync/sender/SyncSenderImpl.java @@ -42,7 +42,10 @@ import java.util.Map; import java.util.Map.Entry; import java.util.Set; import java.util.UUID; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; import org.apache.commons.io.FileUtils; +import org.apache.iotdb.db.concurrent.IoTDBThreadPoolFactory; import org.apache.iotdb.db.concurrent.ThreadName; import org.apache.iotdb.db.exception.SyncConnectionException; import org.apache.iotdb.db.sync.conf.Constans; @@ -66,10 +69,19 @@ import org.slf4j.LoggerFactory; public class SyncSenderImpl implements SyncSender { private static final Logger LOGGER = LoggerFactory.getLogger(SyncSenderImpl.class); + private TTransport transport; + private SyncService.Client serviceClient; + private List<String> schema = new ArrayList<>(); + private static SyncSenderConfig config = SyncSenderDescriptor.getInstance().getConfig(); + + private static final long SYNC_DELAY = 0; + + private static final long SYNC_PERIOD = config.getSyncPeriodInSeconds(); + /** * Files that need to be synchronized */ @@ -96,7 +108,8 @@ public class SyncSenderImpl implements SyncSender { private Map<String, Set<String>> validFileSnapshot = new HashMap<>(); private SyncFileManager syncFileManager = SyncFileManager.getInstance(); - private SyncSenderConfig config = SyncSenderDescriptor.getInstance().getConfig(); + + private ScheduledExecutorService executorService; /** * Monitor sync status. @@ -109,7 +122,7 @@ public class SyncSenderImpl implements SyncSender { continue; } if ((currentTime.getTime() - lastSyncTime.getTime()) - % (config.getUploadCycleInSeconds() * 1000) == 0) { + % (config.getSyncPeriodInSeconds() * 1000) == 0) { oldTime = currentTime; if (syncStatus) { LOGGER.info("Sync process is in execution!"); @@ -130,8 +143,7 @@ public class SyncSenderImpl implements SyncSender { * * @param args not used */ - public static void main(String[] args) - throws InterruptedException, IOException, SyncConnectionException { + public static void main(String[] args) throws IOException { Thread.currentThread().setName(ThreadName.SYNC_CLIENT.getName()); SyncSenderImpl fileSenderImpl = new SyncSenderImpl(); fileSenderImpl.verifySingleton(); @@ -151,22 +163,22 @@ public class SyncSenderImpl implements SyncSender { /** * Start sync task in a certain time. */ - public void timedTask() throws InterruptedException, SyncConnectionException, IOException { - sync(); - lastSyncTime = new Date(); - Date currentTime; - while (true) { - if (Thread.interrupted()) { - break; - } - Thread.sleep(2000); - currentTime = new Date(); - if (currentTime.getTime() - lastSyncTime.getTime() - > config.getUploadCycleInSeconds() * 1000) { - lastSyncTime = currentTime; + public void timedTask() { + executorService = IoTDBThreadPoolFactory.newScheduledThreadPool(1, + "sync-client"); + executorService.scheduleWithFixedDelay(() -> { + try { sync(); + } catch (SyncConnectionException | IOException e) { + LOGGER.error("Sync failed", e); + stop(); } - } + }, SYNC_DELAY, SYNC_PERIOD, TimeUnit.SECONDS); + } + + @Override + public void stop() { + executorService.shutdownNow(); } /** @@ -178,7 +190,7 @@ public class SyncSenderImpl implements SyncSender { //1. Clear old snapshots if necessary for (String snapshotPath : config.getSnapshotPaths()) { if (new File(snapshotPath).exists() && new File(snapshotPath).list().length != 0) { - /** It means that the last task of sync does not succeed! Clear the files and start to sync again **/ + // It means that the last task of sync does not succeed! Clear the files and start to sync again FileUtils.deleteDirectory(new File(snapshotPath)); } } @@ -206,13 +218,13 @@ public class SyncSenderImpl implements SyncSender { syncStatus = true; - try{ + try { // 5. Sync schema syncSchema(); // 6. Sync data syncAllData(); - }catch (SyncConnectionException e){ + } catch (SyncConnectionException e) { LOGGER.error("cannot finish sync process", e); syncStatus = false; return; @@ -245,7 +257,7 @@ public class SyncSenderImpl implements SyncSender { } LOGGER.info("Sync process starts to transfer data of storage group {}", entry.getKey()); try { - if(!serviceClient.init(entry.getKey())){ + if (!serviceClient.init(entry.getKey())) { throw new SyncConnectionException("unable init receiver"); } } catch (TException e) { @@ -258,7 +270,8 @@ public class SyncSenderImpl implements SyncSender { syncFileManager.backupNowLocalFileInfo(config.getLastFileInfo()); LOGGER.info("Sync process has finished storage group {}.", entry.getKey()); } else { - LOGGER.error("Receiver cannot sync data, abandon this synchronization of storage group {}", entry.getKey()); + LOGGER.error("Receiver cannot sync data, abandon this synchronization of storage group {}", + entry.getKey()); } } } @@ -380,7 +393,7 @@ public class SyncSenderImpl implements SyncSender { while (true) { retryCount++; // Sync all data to receiver - if(retryCount > Constans.MAX_SYNC_FILE_TRY){ + if (retryCount > Constans.MAX_SYNC_FILE_TRY) { throw new SyncConnectionException(String .format("can not sync file %s after %s tries.", snapshotFilePath, Constans.MAX_SYNC_FILE_TRY)); @@ -395,8 +408,8 @@ public class SyncSenderImpl implements SyncSender { md.update(buffer, 0, dataLength); ByteBuffer buffToSend = ByteBuffer.wrap(bos.toByteArray()); bos.reset(); - if(!Boolean.parseBoolean(serviceClient - .syncData(null, filePathSplit, buffToSend, SyncDataStatus.PROCESSING_STATUS))){ + if (!Boolean.parseBoolean(serviceClient + .syncData(null, filePathSplit, buffToSend, SyncDataStatus.PROCESSING_STATUS))) { LOGGER.info("Receiver failed to receive data from {}, retry.", snapshotFilePath); continue outer; } @@ -445,7 +458,8 @@ public class SyncSenderImpl implements SyncSender { ByteBuffer buffToSend = ByteBuffer.wrap(bos.toByteArray()); bos.reset(); // PROCESSING_STATUS represents there is still schema buffer to send. - if(!Boolean.parseBoolean(serviceClient.syncSchema(null, buffToSend, SyncDataStatus.PROCESSING_STATUS))){ + if (!Boolean.parseBoolean( + serviceClient.syncSchema(null, buffToSend, SyncDataStatus.PROCESSING_STATUS))) { LOGGER.error("Receiver failed to receive metadata, retry."); continue outer; }
