This is an automated email from the ASF dual-hosted git repository. lta pushed a commit to branch Sync-Reconstruct in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
commit 239174c6fb68614c6908413eaae245fbff4ade4a Author: lta <[email protected]> AuthorDate: Sun Mar 17 23:42:35 2019 +0800 test sync features --- iotdb/iotdb/bin/start-sync-client.bat | 4 +- iotdb/iotdb/bin/start-sync-client.sh | 4 +- iotdb/iotdb/bin/stop-sync-client.bat | 2 +- iotdb/iotdb/bin/stop-sync-client.sh | 2 +- iotdb/iotdb/conf/iotdb-engine.properties | 12 +-- .../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 20 ++-- .../org/apache/iotdb/db/conf/IoTDBDescriptor.java | 14 +-- .../db/engine/filenode/FileNodeProcessor.java | 11 ++- .../org/apache/iotdb/db/sync/conf/Constans.java | 2 + .../iotdb/db/sync/conf/SyncSenderConfig.java | 24 ++--- .../iotdb/db/sync/conf/SyncSenderDescriptor.java | 11 +-- .../iotdb/db/sync/receiver/ServerManager.java | 13 +-- .../iotdb/db/sync/receiver/ServerServiceImpl.java | 109 ++++++++++----------- .../apache/iotdb/db/sync/sender/FileManager.java | 12 +-- .../iotdb/db/sync/sender/FileSenderImpl.java | 54 ++++------ .../java/org/apache/iotdb/db/utils/SyncUtils.java | 39 ++++---- .../iotdb/db/sync/sender/FileManagerTest.java | 37 +++---- ...stBackTest.java => MultipleClientSyncTest.java} | 8 +- ...PostBackTest.java => SingleClientSyncTest.java} | 53 +++++----- ...CreateDataSender1.java => SyncTestClient1.java} | 6 +- ...CreateDataSender2.java => SyncTestClient2.java} | 6 +- ...CreateDataSender3.java => SyncTestClient3.java} | 8 +- .../java/org/apache/iotdb/db/sync/test/Utils.java | 2 +- .../{start-postBackTest.sh => start-sync-test.sh} | 0 .../{stop-postBackTest.sh => stop-sync-test.sh} | 0 25 files changed, 224 insertions(+), 229 deletions(-) diff --git a/iotdb/iotdb/bin/start-sync-client.bat b/iotdb/iotdb/bin/start-sync-client.bat index e8b0e85..7d7fae6 100755 --- a/iotdb/iotdb/bin/start-sync-client.bat +++ b/iotdb/iotdb/bin/start-sync-client.bat @@ -29,7 +29,7 @@ set IOTDB_CONF=%IOTDB_HOME%\conf set IOTDB_LOGS=%IOTDB_HOME%\logs -if NOT DEFINED MAIN_CLASS set MAIN_CLASS=org.apache.iotdb.db.postback.sender.FileSenderImpl +if NOT DEFINED MAIN_CLASS set MAIN_CLASS=org.apache.iotdb.db.sync.sender.FileSenderImpl if NOT DEFINED JAVA_HOME goto :err @REM ----------------------------------------------------------------------------- @@ -46,7 +46,7 @@ set CLASSPATH="%IOTDB_HOME%\lib" REM For each jar in the IOTDB_HOME lib directory call append to build the CLASSPATH variable. for %%i in ("%IOTDB_HOME%\lib\*.jar") do call :append "%%i" -set CLASSPATH=%CLASSPATH%;postBackClient +set CLASSPATH=%CLASSPATH%;SyncClient goto okClasspath :append diff --git a/iotdb/iotdb/bin/start-sync-client.sh b/iotdb/iotdb/bin/start-sync-client.sh index eb3e5a4..4d1cfda 100755 --- a/iotdb/iotdb/bin/start-sync-client.sh +++ b/iotdb/iotdb/bin/start-sync-client.sh @@ -47,8 +47,8 @@ for f in ${IOTDB_HOME}/lib/*.jar; do CLASSPATH=${CLASSPATH}":"$f done -MAIN_CLASS=org.apache.iotdb.db.postback.sender.FileSenderImpl +MAIN_CLASS=org.apache.iotdb.db.sync.sender.FileSenderImpl -"$JAVA" -DIOTDB_HOME=${IOTDB_HOME} -DTSFILE_HOME=${IOTDB_HOME} -DIOTDB_CONF=${IOTDB_CONF} -Dlogback.configurationFile=${IOTDB_CONF}/logback.xml $IOTDB_DERBY_OPTS $IOTDB_JMX_OPTS -Dname=postBackClient -cp "$CLASSPATH" "$MAIN_CLASS" +"$JAVA" -DIOTDB_HOME=${IOTDB_HOME} -DTSFILE_HOME=${IOTDB_HOME} -DIOTDB_CONF=${IOTDB_CONF} -Dlogback.configurationFile=${IOTDB_CONF}/logback.xml $IOTDB_DERBY_OPTS $IOTDB_JMX_OPTS -Dname=SyncClient -cp "$CLASSPATH" "$MAIN_CLASS" exit $? diff --git a/iotdb/iotdb/bin/stop-sync-client.bat b/iotdb/iotdb/bin/stop-sync-client.bat index d7908ec..698bbbc 100755 --- a/iotdb/iotdb/bin/stop-sync-client.bat +++ b/iotdb/iotdb/bin/stop-sync-client.bat @@ -19,5 +19,5 @@ @echo off -wmic process where (commandline like "%%postBackClient%%" and not name="wmic.exe") delete +wmic process where (commandline like "%%SyncClient%%" and not name="wmic.exe") delete rem ps ax | grep -i 'postBackClient' | grep -v grep | awk '{print $1}' | xargs kill -SIGTERM \ No newline at end of file diff --git a/iotdb/iotdb/bin/stop-sync-client.sh b/iotdb/iotdb/bin/stop-sync-client.sh index 9ff0ab7..6e95455 100755 --- a/iotdb/iotdb/bin/stop-sync-client.sh +++ b/iotdb/iotdb/bin/stop-sync-client.sh @@ -26,5 +26,5 @@ if [ -z "$PIDS" ]; then exit 1 else kill -s TERM $PIDS - echo "close PostBackClient" + echo "close SyncClient" fi diff --git a/iotdb/iotdb/conf/iotdb-engine.properties b/iotdb/iotdb/conf/iotdb-engine.properties index cdca52c..5c7046f 100644 --- a/iotdb/iotdb/conf/iotdb-engine.properties +++ b/iotdb/iotdb/conf/iotdb-engine.properties @@ -201,20 +201,20 @@ schema_manager_cache_size=300000 # Generally the default value 4MB is enough. max_log_entry_size=4194304 -# IoTDB postBack server properties +# IoTDB sync server properties # Whether to allow to post back, the default allowed -is_postback_enable=true +is_sync_enable=true -# PostBack server port address -postback_server_port=5555 +# Sync server port address +sync_server_port=5555 -# White IP list of Postback client. +# White IP list of Sync client. # Please use the form of network segment to present the range of IP, for example: 192.168.0.0/16 # If there are more than one IP segment, please separate them by commas # The default is to allow all IP to sync IP_white_list=0.0.0.0/0 -# Choose a postBack strategy of merging historical data: +# Choose a sync strategy of loading historical data: #1. It's more likely to update historical data, please choose "true". #2. It's more likely not to update historical data or you don't know exactly, please choose "false". update_historical_data_possibility=false diff --git a/iotdb/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/iotdb/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java index 815e36b..18d68cd 100644 --- a/iotdb/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java +++ b/iotdb/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java @@ -255,11 +255,11 @@ public class IoTDBConfig { /** * Is this IoTDB instance a receiver of sync or not. */ - private boolean isPostbackEnable = true; + private boolean isSyncEnable = true; /** * If this IoTDB instance is a receiver of sync, set the server port. */ - private int postbackServerPort = 5555; + private int syncServerPort = 5555; /* * Set the language version when loading file including error information, default value is "EN" */ @@ -768,20 +768,20 @@ public class IoTDBConfig { this.maxLogEntrySize = maxLogEntrySize; } - public boolean isPostbackEnable() { - return isPostbackEnable; + public boolean isSyncEnable() { + return isSyncEnable; } - public void setPostbackEnable(boolean postbackEnable) { - isPostbackEnable = postbackEnable; + public void setSyncEnable(boolean syncEnable) { + isSyncEnable = syncEnable; } - public int getPostbackServerPort() { - return postbackServerPort; + public int getSyncServerPort() { + return syncServerPort; } - public void setPostbackServerPort(int postbackServerPort) { - this.postbackServerPort = postbackServerPort; + public void setSyncServerPort(int syncServerPort) { + this.syncServerPort = syncServerPort; } public String getLanguageVersion() { diff --git a/iotdb/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/iotdb/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java index 4108a2f..8da75f2 100644 --- a/iotdb/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java +++ b/iotdb/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java @@ -190,15 +190,15 @@ public class IoTDBDescriptor { properties.getProperty("overflow_file_size_threshold", Long.toString(conf.getOverflowFileSizeThreshold())).trim())); - conf.setPostbackEnable(Boolean - .parseBoolean(properties.getProperty("is_postback_enable", - Boolean.toString(conf.isPostbackEnable())))); - conf.setPostbackServerPort(Integer - .parseInt(properties.getProperty("postback_server_port", - Integer.toString(conf.getPostbackServerPort())).trim())); + conf.setSyncEnable(Boolean + .parseBoolean(properties.getProperty("is_sync_enable", + Boolean.toString(conf.isSyncEnable())))); + conf.setSyncServerPort(Integer + .parseInt(properties.getProperty("sync_server_port", + Integer.toString(conf.getSyncServerPort())).trim())); conf.setUpdate_historical_data_possibility(Boolean.parseBoolean( properties.getProperty("update_historical_data_possibility", - Boolean.toString(conf.isPostbackEnable())))); + Boolean.toString(conf.isSyncEnable())))); conf.setIpWhiteList(properties.getProperty("IP_white_list", conf.getIpWhiteList())); if (conf.getMemThresholdWarning() <= 0) { diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessor.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessor.java index be775a5..651e668 100644 --- a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessor.java +++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessor.java @@ -77,6 +77,7 @@ import org.apache.iotdb.db.monitor.StatMonitor; import org.apache.iotdb.db.query.context.QueryContext; import org.apache.iotdb.db.query.factory.SeriesReaderFactory; import org.apache.iotdb.db.query.reader.IReader; +import org.apache.iotdb.db.sync.conf.Constans; import org.apache.iotdb.db.utils.MemUtils; import org.apache.iotdb.db.utils.QueryUtils; import org.apache.iotdb.db.utils.TimeValuePair; @@ -724,8 +725,8 @@ public class FileNodeProcessor extends Processor implements IStatistic { newMultiPassLock.readLock().unlock(); newMultiPassTokenSet.remove(token); LOGGER.debug("Remove multi token:{}, nspath:{}, new set:{}, lock:{}", token, - getProcessorName(), - newMultiPassTokenSet, newMultiPassLock); + getProcessorName(), + newMultiPassTokenSet, newMultiPassLock); return true; } else if (oldMultiPassTokenSet != null && oldMultiPassTokenSet.contains(token)) { // remove token first, then unlock @@ -884,8 +885,10 @@ public class FileNodeProcessor extends Processor implements IStatistic { tsFileResource.getEndTime(entry.getKey()) >= entry.getValue() && tsFileResource.getStartTime(entry.getKey()) <= appendFile .getEndTime(entry.getKey())) { - String relativeFilePath = "sync" + File.separator + uuid + File.separator + "backup" - + File.separator + tsFileResource.getRelativePath(); + String relativeFilePath = + Constans.SYNC_SERVER + File.separatorChar + uuid + File.separatorChar + + Constans.BACK_UP_DIRECTORY_NAME + + File.separatorChar + tsFileResource.getRelativePath(); File newFile = new File( Directories.getInstance().getTsFileFolder(tsFileResource.getBaseDirIndex()), relativeFilePath); diff --git a/iotdb/src/main/java/org/apache/iotdb/db/sync/conf/Constans.java b/iotdb/src/main/java/org/apache/iotdb/db/sync/conf/Constans.java index cc2581a..739bbbf 100644 --- a/iotdb/src/main/java/org/apache/iotdb/db/sync/conf/Constans.java +++ b/iotdb/src/main/java/org/apache/iotdb/db/sync/conf/Constans.java @@ -34,6 +34,8 @@ public class Constans { public static final String LAST_LOCAL_FILE_NAME = "last_local_files.txt"; public static final String DATA_SNAPSHOT_NAME = "data-snapshot"; + public static final String BACK_UP_DIRECTORY_NAME = "backup"; + /** * Split data file , block size at each transmission **/ diff --git a/iotdb/src/main/java/org/apache/iotdb/db/sync/conf/SyncSenderConfig.java b/iotdb/src/main/java/org/apache/iotdb/db/sync/conf/SyncSenderConfig.java index fe33f2e..05b4659 100644 --- a/iotdb/src/main/java/org/apache/iotdb/db/sync/conf/SyncSenderConfig.java +++ b/iotdb/src/main/java/org/apache/iotdb/db/sync/conf/SyncSenderConfig.java @@ -27,7 +27,7 @@ import org.apache.iotdb.db.metadata.MetadataConstant; */ public class SyncSenderConfig { - private String[] iotdbBufferwriteDirectory = IoTDBDescriptor.getInstance().getConfig() + private String[] bufferwriteDirectory = IoTDBDescriptor.getInstance().getConfig() .getBufferWriteDirs(); private String dataDirectory = IoTDBDescriptor.getInstance().getConfig().getDataDir(); private String uuidPath; @@ -54,27 +54,27 @@ public class SyncSenderConfig { uuidPath = dataDirectory + Constans.SYNC_CLIENT + File.separatorChar + Constans.UUID_FILE_NAME; lastFileInfo = dataDirectory + Constans.SYNC_CLIENT + File.separatorChar + Constans.LAST_LOCAL_FILE_NAME; - snapshotPaths = new String[iotdbBufferwriteDirectory.length]; - for (int i = 0; i < iotdbBufferwriteDirectory.length; i++) { - iotdbBufferwriteDirectory[i] = new File(iotdbBufferwriteDirectory[i]).getAbsolutePath(); - if (iotdbBufferwriteDirectory[i].length() > 0 - && iotdbBufferwriteDirectory[i].charAt(iotdbBufferwriteDirectory[i].length() - 1) + snapshotPaths = new String[bufferwriteDirectory.length]; + for (int i = 0; i < bufferwriteDirectory.length; i++) { + bufferwriteDirectory[i] = new File(bufferwriteDirectory[i]).getAbsolutePath(); + if (bufferwriteDirectory[i].length() > 0 + && bufferwriteDirectory[i].charAt(bufferwriteDirectory[i].length() - 1) != File.separatorChar) { - iotdbBufferwriteDirectory[i] = iotdbBufferwriteDirectory[i] + File.separatorChar; + bufferwriteDirectory[i] = bufferwriteDirectory[i] + File.separatorChar; } - snapshotPaths[i] = iotdbBufferwriteDirectory[i] + Constans.SYNC_CLIENT + File.separatorChar + snapshotPaths[i] = bufferwriteDirectory[i] + Constans.SYNC_CLIENT + File.separatorChar + Constans.DATA_SNAPSHOT_NAME + File.separatorChar; } } - public String[] getIotdbBufferwriteDirectory() { - return iotdbBufferwriteDirectory; + public String[] getBufferwriteDirectory() { + return bufferwriteDirectory; } - public void setIotdbBufferwriteDirectory(String[] iotdbBufferwriteDirectory) { - this.iotdbBufferwriteDirectory = iotdbBufferwriteDirectory; + public void setBufferwriteDirectory(String[] bufferwriteDirectory) { + this.bufferwriteDirectory = bufferwriteDirectory; } public String getDataDirectory() { diff --git a/iotdb/src/main/java/org/apache/iotdb/db/sync/conf/SyncSenderDescriptor.java b/iotdb/src/main/java/org/apache/iotdb/db/sync/conf/SyncSenderDescriptor.java index c04336f..9ca52c1 100644 --- a/iotdb/src/main/java/org/apache/iotdb/db/sync/conf/SyncSenderDescriptor.java +++ b/iotdb/src/main/java/org/apache/iotdb/db/sync/conf/SyncSenderDescriptor.java @@ -107,19 +107,18 @@ public class SyncSenderDescriptor { dataDirectory + Constans.SYNC_CLIENT + File.separatorChar + Constans.UUID_FILE_NAME); conf.setLastFileInfo( dataDirectory + Constans.SYNC_CLIENT + File.separatorChar + Constans.LAST_LOCAL_FILE_NAME); - String[] iotdbBufferwriteDirectory = conf.getIotdbBufferwriteDirectory(); - String[] snapshots = new String[conf.getIotdbBufferwriteDirectory().length]; - for (int i = 0; i < conf.getIotdbBufferwriteDirectory().length; i++) { + String[] iotdbBufferwriteDirectory = conf.getBufferwriteDirectory(); + String[] snapshots = new String[conf.getBufferwriteDirectory().length]; + for (int i = 0; i < conf.getBufferwriteDirectory().length; i++) { if (iotdbBufferwriteDirectory[i].length() > 0 && iotdbBufferwriteDirectory[i].charAt(iotdbBufferwriteDirectory[i].length() - 1) != File.separatorChar) { iotdbBufferwriteDirectory[i] = iotdbBufferwriteDirectory[i] + File.separatorChar; } snapshots[i] = iotdbBufferwriteDirectory[i] + Constans.SYNC_CLIENT + File.separatorChar - + Constans.DATA_SNAPSHOT_NAME - + File.separatorChar; + + Constans.DATA_SNAPSHOT_NAME + File.separatorChar; } - conf.setIotdbBufferwriteDirectory(iotdbBufferwriteDirectory); + conf.setBufferwriteDirectory(iotdbBufferwriteDirectory); conf.setSnapshotPaths(snapshots); } catch (IOException e) { LOGGER.warn("Cannot load config file because {}, use default configuration", e); diff --git a/iotdb/src/main/java/org/apache/iotdb/db/sync/receiver/ServerManager.java b/iotdb/src/main/java/org/apache/iotdb/db/sync/receiver/ServerManager.java index a8baf2f..c723993 100644 --- a/iotdb/src/main/java/org/apache/iotdb/db/sync/receiver/ServerManager.java +++ b/iotdb/src/main/java/org/apache/iotdb/db/sync/receiver/ServerManager.java @@ -22,6 +22,7 @@ import org.apache.iotdb.db.concurrent.ThreadName; import org.apache.iotdb.db.conf.IoTDBConfig; import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.exception.StartupException; +import org.apache.iotdb.service.sync.thrift.SyncService; import org.apache.thrift.TProcessor; import org.apache.thrift.protocol.TBinaryProtocol; import org.apache.thrift.protocol.TBinaryProtocol.Factory; @@ -58,7 +59,7 @@ public class ServerManager { Factory protocolFactory; TProcessor processor; TThreadPoolServer.Args poolArgs; - if (!conf.isPostbackEnable()) { + if (!conf.isSyncEnable()) { return; } try { @@ -68,9 +69,9 @@ public class ServerManager { return; } conf.setIpWhiteList(conf.getIpWhiteList().replaceAll(" ", "")); - serverTransport = new TServerSocket(conf.getPostbackServerPort()); + serverTransport = new TServerSocket(conf.getSyncServerPort()); protocolFactory = new TBinaryProtocol.Factory(); - processor = new ServerService.Processor<>(new ServerServiceImpl()); + processor = new SyncService.Processor<>(new ServerServiceImpl()); poolArgs = new TThreadPoolServer.Args(serverTransport); poolArgs.processor(processor); poolArgs.protocolFactory(protocolFactory); @@ -80,7 +81,7 @@ public class ServerManager { Thread syncServerThread = new Thread(syncServerRunnable, ThreadName.SYNC_SERVER.getName()); syncServerThread.start(); } catch (TTransportException e) { - throw new StartupException("cannot start sync server.", e); + throw new StartupException("Cannot start sync server.", e); } } @@ -88,10 +89,10 @@ public class ServerManager { * close sync receiver's server. */ public void closeServer() { - if (conf.isPostbackEnable() && poolServer != null) { + if (conf.isSyncEnable() && poolServer != null) { poolServer.stop(); serverTransport.close(); - LOGGER.info("stop sync server."); + LOGGER.info("Stop sync server."); } } diff --git a/iotdb/src/main/java/org/apache/iotdb/db/sync/receiver/ServerServiceImpl.java b/iotdb/src/main/java/org/apache/iotdb/db/sync/receiver/ServerServiceImpl.java index 9674767..8d6661d 100644 --- a/iotdb/src/main/java/org/apache/iotdb/db/sync/receiver/ServerServiceImpl.java +++ b/iotdb/src/main/java/org/apache/iotdb/db/sync/receiver/ServerServiceImpl.java @@ -37,6 +37,7 @@ import java.util.Map; import java.util.Map.Entry; import java.util.Objects; import java.util.Set; +import org.apache.iotdb.db.concurrent.ThreadName; import org.apache.iotdb.db.conf.IoTDBConfig; import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.conf.directories.Directories; @@ -70,7 +71,6 @@ import org.apache.iotdb.tsfile.read.common.Path; import org.apache.iotdb.tsfile.read.common.RowRecord; import org.apache.iotdb.tsfile.read.expression.QueryExpression; import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet; -import org.apache.thrift.TException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -110,27 +110,34 @@ public class ServerServiceImpl implements SyncService.Iface { private ThreadLocal<Integer> fileNum = new ThreadLocal<>(); /** - * IoTDB ioTDBConfig + * IoTDB config **/ - private IoTDBConfig ioTDBConfig = IoTDBDescriptor.getInstance().getConfig(); + private IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig(); /** * IoTDB data directory **/ - private String dataPath = ioTDBConfig.getDataDir(); + private String dataPath = config.getDataDir(); /** * IoTDB multiple bufferWrite directory **/ - private String[] bufferWritePaths = ioTDBConfig.getBufferWriteDirs(); + private String[] bufferWritePaths = config.getBufferWriteDirs(); /** * The path to store metadata file of sender */ private ThreadLocal<String> schemaFromSenderPath = new ThreadLocal<>(); - /** Sync path of server **/ - private String syncPath; + /** + * Sync folder path of server + **/ + private String syncFolderPath; + + /** + * Sync data path of server + */ + private String syncDataPath; /** * Init threadLocal variable @@ -150,31 +157,28 @@ public class ServerServiceImpl implements SyncService.Iface { * Verify IP address of sender */ @Override - public boolean checkIdentity(String uuid, String ipAddress) throws TException { + public boolean checkIdentity(String uuid, String ipAddress) { + Thread.currentThread().setName(ThreadName.SYNC_SERVER.getName()); this.uuid.set(uuid); initPath(); - return SyncUtils.verifyIPSegment(ioTDBConfig.getIpWhiteList(), ipAddress); + return SyncUtils.verifyIPSegment(config.getIpWhiteList(), ipAddress); } /** * Init file path and clear data if last sync process failed. */ - private void initPath() throws TException { + private void initPath() { if (dataPath.length() > 0 && dataPath.charAt(dataPath.length() - 1) != File.separatorChar) { dataPath = dataPath + File.separatorChar; } - syncPath = dataPath + SYNC_SERVER + File.separator; + syncFolderPath = dataPath + SYNC_SERVER + File.separatorChar + this.uuid.get(); + syncDataPath = syncFolderPath + File.separatorChar + Constans.DATA_SNAPSHOT_NAME; schemaFromSenderPath - .set(syncPath + this.uuid.get() + File.separator + MetadataConstant.METADATA_LOG); - File syncFileDirectory = new File(syncPath,this.uuid.get()); + .set(syncFolderPath + File.separator + MetadataConstant.METADATA_LOG); + File syncFileDirectory = new File(syncFolderPath, this.uuid.get()); if (syncFileDirectory.exists() && Objects.requireNonNull(syncFileDirectory.list()).length != 0) { - try { - SyncUtils.deleteFile(syncFileDirectory); - } catch (IOException e) { - logger.error("Cannot clear useless metadata file."); - throw new TException(e); - } + SyncUtils.deleteFile(syncFileDirectory); } for (String bufferWritePath : bufferWritePaths) { if (bufferWritePath.length() > 0 @@ -185,12 +189,7 @@ public class ServerServiceImpl implements SyncService.Iface { File backupDirectory = new File(backupPath, this.uuid.get()); if (backupDirectory.exists() && Objects.requireNonNull(backupDirectory.list()).length != 0) { /** if does not exist, it means that the last time sync failed, clear data in the uuid directory and receive the data again **/ - try { - SyncUtils.deleteFile(backupDirectory); - } catch (IOException e) { - logger.error("Cannot clear useless backup data file"); - throw new TException(e); - } + SyncUtils.deleteFile(backupDirectory); } } } @@ -198,8 +197,9 @@ public class ServerServiceImpl implements SyncService.Iface { /** * Acquire schema from sender * - * @param status: SUCCESS_STATUS or PROCESSING_STATUS. status = SUCCESS_STATUS : finish receiving schema file, start to sync schema. - * status = SUCCESS_STATUS : the schema file has not received completely. + * @param status: SUCCESS_STATUS or PROCESSING_STATUS. status = SUCCESS_STATUS : finish receiving + * schema file, start to sync schema. status = SUCCESS_STATUS : the schema file has not received + * completely. */ @Override public void syncSchema(ByteBuffer schema, SyncDataStatus status) { @@ -215,14 +215,14 @@ public class ServerServiceImpl implements SyncService.Iface { logger.error("Cannot create file {}", file.getPath()); } } catch (IOException e) { - logger.error("Cannot make schema file {}.", file.getPath(),e); + logger.error("Cannot make schema file {}.", file.getPath(), e); } } try (FileOutputStream fos = new FileOutputStream(file, true); FileChannel channel = fos.getChannel()) { channel.write(schema); } catch (Exception e) { - logger.error("Cannot write data to file {}.", file.getPath(),e); + logger.error("Cannot write data to file {}.", file.getPath(), e); } } @@ -243,7 +243,7 @@ public class ServerServiceImpl implements SyncService.Iface { logger.error("Cannot read the file {}.", schemaFromSenderPath.get(), e); } catch (IOException e) { - logger.error("Cannot insert schema to IoTDB.", e); + //TODO: how to deal with multiple insert schema } catch (Exception e) { logger.error("Parse metadata operation failed.", e); } @@ -301,8 +301,8 @@ public class ServerServiceImpl implements SyncService.Iface { /** * Start receiving tsfile from sender * - * @param status status = SUCCESS_STATUS : finish receiving one tsfile status = PROCESSING_STATUS : tsfile has not received - * completely. + * @param status status = SUCCESS_STATUS : finish receiving one tsfile status = PROCESSING_STATUS + * : tsfile has not received completely. */ @Override public String syncData(String md5OfSender, List<String> filePathSplit, @@ -310,7 +310,7 @@ public class ServerServiceImpl implements SyncService.Iface { String md5OfReceiver = ""; StringBuilder filePathBuilder = new StringBuilder(); FileChannel channel; - /**Recombination File Path**/ + /** Recombination File Path **/ for (int i = 0; i < filePathSplit.size(); i++) { if (i == filePathSplit.size() - 1) { filePathBuilder.append(filePathSplit.get(i)); @@ -319,10 +319,11 @@ public class ServerServiceImpl implements SyncService.Iface { } } String filePath = filePathBuilder.toString(); - if(syncPath.length() > 0 && syncPath.charAt(syncPath.length()-1)!=File.separatorChar){ - syncPath = syncPath+File.separatorChar; + if (syncDataPath.length() > 0 + && syncDataPath.charAt(syncDataPath.length() - 1) != File.separatorChar) { + syncDataPath = syncDataPath + File.separatorChar; } - filePath = syncPath + uuid.get() + File.separator + filePath; + filePath = syncDataPath + filePath; if (status == SyncDataStatus.PROCESSING_STATUS) { // there are still data stream to add File file = new File(filePath); if (!file.getParentFile().exists()) { @@ -339,7 +340,7 @@ public class ServerServiceImpl implements SyncService.Iface { channel = fos.getChannel(); channel.write(dataToReceive); } catch (IOException e) { - logger.error("cannot write data to file {}",file.getPath(), e); + logger.error("cannot write data to file {}", file.getPath(), e); } } else { // all data in the same file has received successfully @@ -370,14 +371,10 @@ public class ServerServiceImpl implements SyncService.Iface { @Override - public boolean load() throws TException { + public boolean load() { getFileNodeInfo(); loadData(); - try { - SyncUtils.deleteFile(new File(syncPath + this.uuid.get())); - } catch (IOException e) { - throw new TException(e); - } + SyncUtils.deleteFile(new File(syncDataPath)); for (String bufferWritePath : bufferWritePaths) { if (bufferWritePath.length() > 0 && bufferWritePath.charAt(bufferWritePath.length() - 1) != File.separatorChar) { @@ -386,12 +383,7 @@ public class ServerServiceImpl implements SyncService.Iface { String backupPath = bufferWritePath + SYNC_SERVER + File.separator; File backupDirectory = new File(backupPath, this.uuid.get()); if (backupDirectory.exists() && Objects.requireNonNull(backupDirectory.list()).length != 0) { - try { - SyncUtils.deleteFile(backupDirectory); - } catch (IOException e) { - logger.error("Cannot clear useless backup data file"); - throw new TException(e); - } + SyncUtils.deleteFile(backupDirectory); } } return true; @@ -401,7 +393,7 @@ public class ServerServiceImpl implements SyncService.Iface { * Get all tsfiles' info which are sent from sender, it is prepare for merging these data */ public void getFileNodeInfo() { - File dataFileRoot = new File(syncPath,this.uuid.get()); + File dataFileRoot = new File(syncDataPath); File[] files = dataFileRoot.listFiles(); int processedNum = 0; for (File storageGroupPB : files) { @@ -432,9 +424,9 @@ public class ServerServiceImpl implements SyncService.Iface { logger.error("Cannot close file stream {}", fileTF.getPath(), e); } } - fileNodeStartTime.get().put(fileTF.getAbsolutePath(), startTimeMap); - fileNodeEndTime.get().put(fileTF.getAbsolutePath(), endTimeMap); - filesPath.add(fileTF.getAbsolutePath()); + fileNodeStartTime.get().put(fileTF.getPath(), startTimeMap); + fileNodeEndTime.get().put(fileTF.getPath(), endTimeMap); + filesPath.add(fileTF.getPath()); processedNum++; if (logger.isInfoEnabled()) { logger.info(String @@ -452,8 +444,9 @@ public class ServerServiceImpl implements SyncService.Iface { * possibility of updating historical data. */ public void loadData() { - if(syncPath.length() > 0 && syncPath.charAt(syncPath.length()-1) != File.separatorChar){ - syncPath =syncPath + File.separatorChar; + if (syncDataPath.length() > 0 + && syncDataPath.charAt(syncDataPath.length() - 1) != File.separatorChar) { + syncDataPath = syncDataPath + File.separatorChar; } int processedNum = 0; for (String storageGroup : fileNodeMap.get().keySet()) { @@ -487,7 +480,7 @@ public class ServerServiceImpl implements SyncService.Iface { Map<String, Long> endTimeMap = fileNodeEndTime.get().get(path); // create a new fileNode - String header = syncPath + uuid.get() + File.separator; + String header = syncDataPath; String relativePath = path.substring(header.length()); TsFileResource fileNode = new TsFileResource(startTimeMap, endTimeMap, OverflowChangeType.NO_CHANGE, @@ -496,7 +489,7 @@ public class ServerServiceImpl implements SyncService.Iface { try { if (!fileNodeManager.appendFileToFileNode(storageGroup, fileNode, path)) { // it is a file with overflow data - if (ioTDBConfig.isUpdate_historical_data_possibility()) { + if (config.isUpdate_historical_data_possibility()) { loadOldData(path); } else { List<String> overlapFiles = fileNodeManager.getOverlapFilesFromFileNode( @@ -735,6 +728,7 @@ public class ServerServiceImpl implements SyncService.Iface { fileNodeStartTime.remove(); fileNodeEndTime.remove(); schemaFromSenderPath.remove(); + SyncUtils.deleteFile(new File(syncFolderPath)); logger.info("Synchronization has finished!"); } @@ -745,4 +739,5 @@ public class ServerServiceImpl implements SyncService.Iface { public void setFileNodeMap(Map<String, List<String>> fileNodeMap) { this.fileNodeMap.set(fileNodeMap); } + } \ No newline at end of file diff --git a/iotdb/src/main/java/org/apache/iotdb/db/sync/sender/FileManager.java b/iotdb/src/main/java/org/apache/iotdb/db/sync/sender/FileManager.java index d2d4258..220745c 100644 --- a/iotdb/src/main/java/org/apache/iotdb/db/sync/sender/FileManager.java +++ b/iotdb/src/main/java/org/apache/iotdb/db/sync/sender/FileManager.java @@ -97,7 +97,7 @@ public class FileManager { } } } - LOGGER.info("acquire list of valid files."); + LOGGER.info("Acquire list of valid files."); for (Entry<String, Set<String>> entry : validAllFiles.entrySet()) { for (String path : entry.getValue()) { LOGGER.info(path); @@ -117,10 +117,10 @@ public class FileManager { if (!file.exists()) { try { if (!file.createNewFile()) { - LOGGER.error("cannot create file {}", file.getAbsoluteFile()); + LOGGER.error("Cannot create file {}", file.getPath()); } } catch (IOException e) { - throw new IOException("cannot get last local file list", e); + throw new IOException("Cannot get last local file list", e); } } else { try (BufferedReader bf = new BufferedReader(new FileReader(file))) { @@ -129,7 +129,7 @@ public class FileManager { fileList.add(fileName); } } catch (IOException e) { - LOGGER.error("cannot get last local file list when reading file {}.", + LOGGER.error("Cannot get last local file list when reading file {}.", syncConfig.getLastFileInfo()); throw new IOException(e); } @@ -160,7 +160,7 @@ public class FileManager { for (File file : files) { if (!file.getPath().endsWith(RESTORE_SUFFIX) && !new File( file.getPath() + RESTORE_SUFFIX).exists()) { - currentLocalFiles.get(storageGroup.getName()).add(file.getAbsolutePath()); + currentLocalFiles.get(storageGroup.getName()).add(file.getPath()); } } } @@ -181,7 +181,7 @@ public class FileManager { } } } catch (IOException e) { - LOGGER.error("cannot back up now local file info", e); + LOGGER.error("Cannot back up current local file info", e); } } diff --git a/iotdb/src/main/java/org/apache/iotdb/db/sync/sender/FileSenderImpl.java b/iotdb/src/main/java/org/apache/iotdb/db/sync/sender/FileSenderImpl.java index c099c56..0279033 100644 --- a/iotdb/src/main/java/org/apache/iotdb/db/sync/sender/FileSenderImpl.java +++ b/iotdb/src/main/java/org/apache/iotdb/db/sync/sender/FileSenderImpl.java @@ -194,12 +194,7 @@ public class FileSenderImpl implements FileSender { for (String snapshotPath : config.getSnapshotPaths()) { if (new File(snapshotPath).exists() && new File(snapshotPath).list().length != 0) { /** It means that the last task of sync does not succeed! Clear the files and start to sync again **/ - try { - SyncUtils.deleteFile(new File(snapshotPath)); - } catch (IOException e) { - LOGGER.error("can not delete file {}", snapshotPath); - throw new IOException(e); - } + SyncUtils.deleteFile(new File(snapshotPath)); } } @@ -236,11 +231,7 @@ public class FileSenderImpl implements FileSender { // 7. clear snapshot for (String snapshotPath : config.getSnapshotPaths()) { - try { - SyncUtils.deleteFile(new File(snapshotPath)); - } catch (IOException e) { - LOGGER.error("can not delete snapshot", e); - } + SyncUtils.deleteFile(new File(snapshotPath)); } // 8. notify receiver that synchronization finish @@ -248,10 +239,10 @@ public class FileSenderImpl implements FileSender { try { serviceClient.cleanUp(); } catch (TException e) { - LOGGER.error("unable to connect to receiver ", e); + LOGGER.error("Unable to connect to receiver.", e); } transport.close(); - LOGGER.info("sync process has finished"); + LOGGER.info("Sync process has finished."); syncStatus = false; } @@ -263,21 +254,21 @@ public class FileSenderImpl implements FileSender { if (validSnapshot.isEmpty()) { continue; } - LOGGER.info("sync process starts to transfer data of storage group {}", entry.getKey()); + LOGGER.info("Sync process starts to transfer data of storage group {}", entry.getKey()); try { serviceClient.init(entry.getKey()); } catch (TException e) { - throw new SyncConnectionException("unable to connect to receiver", e); + throw new SyncConnectionException("Unable to connect to receiver", e); } syncData(validSnapshot); if (afterSynchronization()) { currentLocalFiles.get(entry.getKey()).addAll(validFiles); fileManager.setCurrentLocalFiles(currentLocalFiles); fileManager.backupNowLocalFileInfo(config.getLastFileInfo()); - LOGGER.info("sync process has finished storage group {}.", entry.getKey()); + LOGGER.info("Sync process has finished storage group {}.", entry.getKey()); } else { throw new SyncConnectionException( - "receiver cannot sync data, abandon this synchronization"); + "Receiver cannot sync data, abandon this synchronization"); } } } @@ -297,7 +288,7 @@ public class FileSenderImpl implements FileSender { transport.open(); } catch (TTransportException e) { syncStatus = false; - LOGGER.error("cannot connect to server"); + LOGGER.error("Cannot connect to server"); throw new SyncConnectionException(e); } } @@ -313,20 +304,18 @@ public class FileSenderImpl implements FileSender { } if (!file.exists()) { try (FileOutputStream out = new FileOutputStream(file)) { - if (!file.createNewFile()) { - LOGGER.error("cannot create file {}", file.getPath()); - } + file.createNewFile(); uuid = generateUUID(); out.write(uuid.getBytes()); } catch (IOException e) { - LOGGER.error("cannot write UUID to file {}", file.getPath()); + LOGGER.error("Cannot write UUID to file {}", file.getPath()); throw new IOException(e); } } else { try (BufferedReader bf = new BufferedReader((new FileReader(uuidPath)))) { uuid = bf.readLine(); } catch (IOException e) { - LOGGER.error("cannot read UUID from file{}", file.getPath()); + LOGGER.error("Cannot read UUID from file{}", file.getPath()); throw new IOException(e); } } @@ -335,7 +324,7 @@ public class FileSenderImpl implements FileSender { legalConnection = serviceClient.checkIdentity(uuid, InetAddress.getLocalHost().getHostAddress()); } catch (Exception e) { - LOGGER.error("cannot confirm identity with receiver"); + LOGGER.error("Cannot confirm identity with receiver"); throw new SyncConnectionException(e); } return legalConnection; @@ -364,7 +353,7 @@ public class FileSenderImpl implements FileSender { Files.createLink(link, target); } } catch (IOException e) { - LOGGER.error("can not make fileSnapshot"); + LOGGER.error("Can not make fileSnapshot"); throw new IOException(e); } return validFilesSnapshot; @@ -421,7 +410,7 @@ public class FileSenderImpl implements FileSender { String md5OfReceiver = serviceClient.syncData(md5OfSender, filePathSplit, null, SyncDataStatus.SUCCESS_STATUS); if (md5OfSender.equals(md5OfReceiver)) { - LOGGER.info("receiver has received {} successfully.", snapshotFilePath); + LOGGER.info("Receiver has received {} successfully.", snapshotFilePath); break; } } @@ -431,7 +420,7 @@ public class FileSenderImpl implements FileSender { } } } catch (Exception e) { - throw new SyncConnectionException("cannot sync data with receiver.", e); + throw new SyncConnectionException("Cannot sync data with receiver.", e); } } @@ -456,7 +445,7 @@ public class FileSenderImpl implements FileSender { // 0 represents the schema file has been transferred completely. serviceClient.syncSchema(null, SyncDataStatus.SUCCESS_STATUS); } catch (Exception e) { - LOGGER.error("cannot sync schema ", e); + LOGGER.error("Cannot sync schema ", e); throw new SyncConnectionException(e); } } @@ -468,7 +457,7 @@ public class FileSenderImpl implements FileSender { successOrNot = serviceClient.load(); } catch (TException e) { throw new SyncConnectionException( - "can not finish sync process because sync receiver has broken down.", e); + "Can not finish sync process because sync receiver has broken down.", e); } return successOrNot; } @@ -481,7 +470,7 @@ public class FileSenderImpl implements FileSender { try { Socket socket = new Socket("localhost", config.getClientPort()); socket.close(); - LOGGER.error("The sync client has been started!"); + LOGGER.error("Sync client has been started!"); System.exit(0); } catch (IOException e) { try (ServerSocket listenerSocket = new ServerSocket(config.getClientPort())) { @@ -490,14 +479,13 @@ public class FileSenderImpl implements FileSender { try { listenerSocket.accept(); } catch (IOException e2) { - LOGGER.error("IoTDB sync sender: unable to listen to port{}", - config.getClientPort(), e2); + LOGGER.error("Unable to listen to port{}", config.getClientPort(), e2); } } }); listener.start(); } catch (IOException e1) { - LOGGER.error("unable to listen to port{}", config.getClientPort()); + LOGGER.error("Unable to listen to port{}", config.getClientPort()); throw new IOException(); } } diff --git a/iotdb/src/main/java/org/apache/iotdb/db/utils/SyncUtils.java b/iotdb/src/main/java/org/apache/iotdb/db/utils/SyncUtils.java index 998aff2..a57d282 100644 --- a/iotdb/src/main/java/org/apache/iotdb/db/utils/SyncUtils.java +++ b/iotdb/src/main/java/org/apache/iotdb/db/utils/SyncUtils.java @@ -1,21 +1,24 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license - * agreements. See the NOTICE file distributed with this work for additional information regarding - * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with the License. You may obtain - * a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software distributed under the License - * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express - * or implied. See the License for the specific language governing permissions and limitations + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations * under the License. */ package org.apache.iotdb.db.utils; import java.io.File; -import java.io.IOException; import java.text.DecimalFormat; import java.util.Map; import java.util.Map.Entry; @@ -61,7 +64,8 @@ public class SyncUtils { if (!new File(snapshotPath).exists()) { new File(snapshotPath).mkdir(); } - if(snapshotPath.length() > 0 && snapshotPath.charAt(snapshotPath.length()-1)!=File.separatorChar){ + if (snapshotPath.length() > 0 + && snapshotPath.charAt(snapshotPath.length() - 1) != File.separatorChar) { snapshotPath = snapshotPath + File.separatorChar; } return snapshotPath + relativeFilePath; @@ -129,25 +133,20 @@ public class SyncUtils { * * @param file folder file */ - public static void deleteFile(File file) throws IOException { + public static void deleteFile(File file) { if (!file.exists()) { return; } if (file.isFile() || Objects.requireNonNull(file.list()).length == 0) { - if (!file.delete()) { - throw new IOException( - String.format("cannot delete file : %s", file.getPath())); - } + file.delete(); } else { File[] files = file.listFiles(); assert files != null; for (File f : files) { deleteFile(f); - if (!f.delete()) { - throw new IOException( - String.format("cannot delete file : %s", f.getPath())); - } + f.delete(); } + file.delete(); } } } diff --git a/iotdb/src/test/java/org/apache/iotdb/db/sync/sender/FileManagerTest.java b/iotdb/src/test/java/org/apache/iotdb/db/sync/sender/FileManagerTest.java index a23bdbc..6e83e03 100644 --- a/iotdb/src/test/java/org/apache/iotdb/db/sync/sender/FileManagerTest.java +++ b/iotdb/src/test/java/org/apache/iotdb/db/sync/sender/FileManagerTest.java @@ -33,6 +33,9 @@ import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +/** + * @author Tianan Li + */ public class FileManagerTest { public static final String POST_BACK_DIRECTORY_TEST = Constans.SYNC_CLIENT + File.separator; @@ -50,7 +53,7 @@ public class FileManagerTest { file.getParentFile().mkdirs(); } if (!file.exists() && !file.createNewFile()) { - LOGGER.error("Can not create new file {}", file.getAbsoluteFile()); + LOGGER.error("Can not create new file {}", file.getAbsolutePath()); } file = new File(SENDER_FILE_PATH_TEST); if (!file.exists()) { @@ -78,7 +81,7 @@ public class FileManagerTest { } @Test // It tests two classes : backupNowLocalFileInfo and getLastLocalFileList - public void testBackupNowLocalFileInfo() throws IOException { + public void testBackupCurrentLocalFileInfo() throws IOException { Map<String, Set<String>> allFileList = new HashMap<>(); Random r = new Random(0); @@ -89,14 +92,14 @@ public class FileManagerTest { } String rand = String.valueOf(r.nextInt(10000)); String fileName = - SENDER_FILE_PATH_TEST + File.separator + String.valueOf(i) + File.separator + rand; + SENDER_FILE_PATH_TEST + File.separator + i + File.separator + rand; File file = new File(fileName); allFileList.get(String.valueOf(i)).add(file.getAbsolutePath()); if (!file.getParentFile().exists()) { file.getParentFile().mkdirs(); } if (!file.exists() && !file.createNewFile()) { - LOGGER.error("Can not create new file {}", file.getAbsoluteFile()); + LOGGER.error("Can not create new file {}", file.getAbsolutePath()); } } } @@ -125,14 +128,14 @@ public class FileManagerTest { } String rand = String.valueOf(r.nextInt(10000)); String fileName = - SENDER_FILE_PATH_TEST + File.separator + String.valueOf(i) + File.separator + rand; + SENDER_FILE_PATH_TEST + File.separator + i + File.separator + rand; File file = new File(fileName); allFileList.get(String.valueOf(i)).add(file.getAbsolutePath()); if (!file.getParentFile().exists()) { file.getParentFile().mkdirs(); } if (!file.exists() && !file.createNewFile()) { - LOGGER.error("Can not create new file {}", file.getAbsoluteFile()); + LOGGER.error("Can not create new file {}", file.getAbsolutePath()); } } } @@ -163,7 +166,7 @@ public class FileManagerTest { } @Test - public void testGetNowLocalFileList() throws IOException { + public void testGetCurrentLocalFileList() throws IOException { Map<String, Set<String>> allFileList = new HashMap<>(); Map<String, Set<String>> fileList; @@ -188,7 +191,7 @@ public class FileManagerTest { file.getParentFile().mkdirs(); } if (!file.exists() && !file.createNewFile()) { - LOGGER.error("Can not create new file {}", file.getAbsoluteFile()); + LOGGER.error("Can not create new file {}", file.getAbsolutePath()); } } } @@ -226,14 +229,14 @@ public class FileManagerTest { } String rand = String.valueOf(r.nextInt(10000)); String fileName = - SENDER_FILE_PATH_TEST + File.separator + String.valueOf(i) + File.separator + rand; + SENDER_FILE_PATH_TEST + File.separator + i + File.separator + rand; File file = new File(fileName); allFileList.get(String.valueOf(i)).add(file.getAbsolutePath()); if (!file.getParentFile().exists()) { file.getParentFile().mkdirs(); } if (!file.exists() && !file.createNewFile()) { - LOGGER.error("Can not create new file {}", file.getAbsoluteFile()); + LOGGER.error("Can not create new file {}", file.getAbsolutePath()); } } } @@ -256,11 +259,11 @@ public class FileManagerTest { } @Test - public void testGetSendingFileList() throws IOException { + public void testGetValidFileList() throws IOException { Map<String, Set<String>> allFileList; Map<String, Set<String>> newFileList = new HashMap<>(); Map<String, Set<String>> sendingFileList; - Set<String> lastlocalList; + Set<String> lastLocalList; // nowSendingList is empty @@ -268,9 +271,9 @@ public class FileManagerTest { manager.getCurrentLocalFileList(new String[]{SENDER_FILE_PATH_TEST}); allFileList = manager.getCurrentLocalFiles(); manager.getLastLocalFileList(LAST_FILE_INFO_TEST); - lastlocalList = manager.getLastLocalFiles(); + lastLocalList = manager.getLastLocalFiles(); manager.getValidFileList(); - assert (lastlocalList.isEmpty()); + assert (lastLocalList.isEmpty()); assert (isEmpty(allFileList)); // add some files @@ -287,7 +290,7 @@ public class FileManagerTest { } String rand = String.valueOf(r.nextInt(10000)); String fileName = - SENDER_FILE_PATH_TEST + File.separator + String.valueOf(i) + File.separator + rand; + SENDER_FILE_PATH_TEST + File.separator + i + File.separator + rand; File file = new File(fileName); allFileList.get(String.valueOf(i)).add(file.getAbsolutePath()); newFileList.get(String.valueOf(i)).add(file.getAbsolutePath()); @@ -295,7 +298,7 @@ public class FileManagerTest { file.getParentFile().mkdirs(); } if (!file.exists() && !file.createNewFile()) { - LOGGER.error("Can not create new file {}", file.getAbsoluteFile()); + LOGGER.error("Can not create new file {}", file.getAbsolutePath()); } } } @@ -349,7 +352,7 @@ public class FileManagerTest { file.getParentFile().mkdirs(); } if (!file.exists() && !file.createNewFile()) { - LOGGER.error("Can not create new file {}", file.getAbsoluteFile()); + LOGGER.error("Can not create new file {}", file.getAbsolutePath()); } } } diff --git a/iotdb/src/test/java/org/apache/iotdb/db/sync/sender/MultipleClientPostBackTest.java b/iotdb/src/test/java/org/apache/iotdb/db/sync/sender/MultipleClientSyncTest.java similarity index 97% rename from iotdb/src/test/java/org/apache/iotdb/db/sync/sender/MultipleClientPostBackTest.java rename to iotdb/src/test/java/org/apache/iotdb/db/sync/sender/MultipleClientSyncTest.java index c988471..c334016 100644 --- a/iotdb/src/test/java/org/apache/iotdb/db/sync/sender/MultipleClientPostBackTest.java +++ b/iotdb/src/test/java/org/apache/iotdb/db/sync/sender/MultipleClientSyncTest.java @@ -38,17 +38,17 @@ import org.apache.iotdb.jdbc.Config; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class MultipleClientPostBackTest { +public class MultipleClientSyncTest { Map<String, ArrayList<String>> timeseriesList = new HashMap(); Map<String, ArrayList<String>> timeseriesList1 = new HashMap(); - private static final Logger LOGGER = LoggerFactory.getLogger(MultipleClientPostBackTest.class); + private static final Logger LOGGER = LoggerFactory.getLogger(MultipleClientSyncTest.class); private Set<String> dataSender = new HashSet<>(); private Set<String> dataReceiver = new HashSet<>(); public static void main(String[] args) throws IOException { - MultipleClientPostBackTest multipleClientPostBackTest = new MultipleClientPostBackTest(); - multipleClientPostBackTest.testPostback(); + MultipleClientSyncTest multipleClientSyncTest = new MultipleClientSyncTest(); + multipleClientSyncTest.testPostback(); } public void testPostback() throws IOException { diff --git a/iotdb/src/test/java/org/apache/iotdb/db/sync/sender/IoTDBSingleClientPostBackTest.java b/iotdb/src/test/java/org/apache/iotdb/db/sync/sender/SingleClientSyncTest.java similarity index 94% rename from iotdb/src/test/java/org/apache/iotdb/db/sync/sender/IoTDBSingleClientPostBackTest.java rename to iotdb/src/test/java/org/apache/iotdb/db/sync/sender/SingleClientSyncTest.java index ccdafff..c5b8c3a 100644 --- a/iotdb/src/test/java/org/apache/iotdb/db/sync/sender/IoTDBSingleClientPostBackTest.java +++ b/iotdb/src/test/java/org/apache/iotdb/db/sync/sender/SingleClientSyncTest.java @@ -34,23 +34,24 @@ import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.exception.StartupException; import org.apache.iotdb.db.exception.SyncConnectionException; import org.apache.iotdb.db.integration.Constant; +import org.apache.iotdb.db.service.IoTDB; +import org.apache.iotdb.db.sync.conf.Constans; import org.apache.iotdb.db.sync.conf.SyncSenderConfig; import org.apache.iotdb.db.sync.conf.SyncSenderDescriptor; -import org.apache.iotdb.db.service.IoTDB; import org.apache.iotdb.db.utils.EnvironmentUtils; import org.apache.iotdb.jdbc.Config; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * The test is to run a complete sync function Before you run the test, make sure receiver has - * been cleaned up and inited. + * The test is to run a complete sync function Before you run the test, make sure receiver has been + * cleaned up and inited. */ -public class IoTDBSingleClientPostBackTest { +public class SingleClientSyncTest { FileSenderImpl fileSenderImpl = FileSenderImpl.getInstance(); private IoTDBConfig conf = IoTDBDescriptor.getInstance().getConfig(); - private String serverIpTest = "192.168.130.17"; + private String serverIpTest = "192.168.130.7"; private SyncSenderConfig config = SyncSenderDescriptor.getInstance().getConfig(); private Set<String> dataSender = new HashSet<>(); private Set<String> dataReceiver = new HashSet<>(); @@ -145,11 +146,11 @@ public class IoTDBSingleClientPostBackTest { "insert into root.test.d0(timestamp,s1) values(3000,'1309')", "insert into root.test.d1.g0(timestamp,s0) values(400,1050)", "merge", "flush",}; private boolean testFlag = Constant.testFlag; - private static final String POSTBACK = "sync"; - private static final Logger logger = LoggerFactory.getLogger(IoTDBSingleClientPostBackTest.class); + private static final String SYNC_CLIENT = Constans.SYNC_CLIENT; + private static final Logger logger = LoggerFactory.getLogger(SingleClientSyncTest.class); public static void main(String[] args) throws Exception { - IoTDBSingleClientPostBackTest singleClientPostBackTest = new IoTDBSingleClientPostBackTest(); + SingleClientSyncTest singleClientPostBackTest = new SingleClientSyncTest(); singleClientPostBackTest.setUp(); singleClientPostBackTest.testPostback(); singleClientPostBackTest.tearDown(); @@ -157,21 +158,23 @@ public class IoTDBSingleClientPostBackTest { } public void setConfig() { - config.setUuidPath(config.getDataDirectory() + POSTBACK + File.separator + "uuid.txt"); + config.setUuidPath( + config.getDataDirectory() + SYNC_CLIENT + File.separator + Constans.UUID_FILE_NAME); config.setLastFileInfo( - config.getDataDirectory() + POSTBACK + File.separator + "lastLocalFileList.txt"); - String[] iotdbBufferwriteDirectory = config.getIotdbBufferwriteDirectory(); - String[] snapshots = new String[config.getIotdbBufferwriteDirectory().length]; - for (int i = 0; i < config.getIotdbBufferwriteDirectory().length; i++) { + config.getDataDirectory() + SYNC_CLIENT + File.separator + Constans.LAST_LOCAL_FILE_NAME); + String[] iotdbBufferwriteDirectory = config.getBufferwriteDirectory(); + String[] snapshots = new String[config.getBufferwriteDirectory().length]; + for (int i = 0; i < config.getBufferwriteDirectory().length; i++) { iotdbBufferwriteDirectory[i] = new File(iotdbBufferwriteDirectory[i]).getAbsolutePath(); if (!iotdbBufferwriteDirectory[i].endsWith(File.separator)) { iotdbBufferwriteDirectory[i] = iotdbBufferwriteDirectory[i] + File.separator; } - snapshots[i] = iotdbBufferwriteDirectory[i] + POSTBACK + File.separator + "dataSnapshot" - + File.separator; + snapshots[i] = + iotdbBufferwriteDirectory[i] + SYNC_CLIENT + File.separator + Constans.DATA_SNAPSHOT_NAME + + File.separator; } config.setSnapshotPaths(snapshots); - config.setIotdbBufferwriteDirectory(iotdbBufferwriteDirectory); + config.setBufferwriteDirectory(iotdbBufferwriteDirectory); config.setServerIp(serverIpTest); fileSenderImpl.setConfig(config); } @@ -204,7 +207,7 @@ public class IoTDBSingleClientPostBackTest { public void testPostback() throws IOException, SyncConnectionException { if (testFlag) { // the first time to sync - logger.debug("It's the first time to post back!"); + logger.debug("It's the first time to sync!"); try { Thread.sleep(2000); Class.forName(Config.JDBC_DRIVER_NAME); @@ -271,7 +274,8 @@ public class IoTDBSingleClientPostBackTest { Connection connection = null; try { connection = DriverManager - .getConnection("jdbc:iotdb://192.168.130.17:6667/", "root", "root"); + .getConnection(String.format("jdbc:iotdb://%s:6667/", serverIpTest), "root", + "root"); Statement statement = connection.createStatement(); boolean hasResultSet = statement.execute("select * from root.vehicle"); if (hasResultSet) { @@ -298,7 +302,7 @@ public class IoTDBSingleClientPostBackTest { } statement.close(); } catch (Exception e) { - logger.error("",e); + logger.error("", e); } finally { if (connection != null) { connection.close(); @@ -318,7 +322,7 @@ public class IoTDBSingleClientPostBackTest { } // the second time to sync - logger.debug("It's the second time to post back!"); + logger.debug("It's the second time to sync!"); try { Thread.sleep(2000); Class.forName(Config.JDBC_DRIVER_NAME); @@ -400,7 +404,8 @@ public class IoTDBSingleClientPostBackTest { Connection connection = null; try { connection = DriverManager - .getConnection("jdbc:iotdb://192.168.130.17:6667/", "root", "root"); + .getConnection(String.format("jdbc:iotdb://%s:6667/", serverIpTest), "root", + "root"); Statement statement = connection.createStatement(); boolean hasResultSet = statement.execute("select * from root.vehicle"); if (hasResultSet) { @@ -446,7 +451,7 @@ public class IoTDBSingleClientPostBackTest { } // the third time to sync - logger.debug("It's the third time to post back!"); + logger.debug("It's the third time to sync!"); try { Thread.sleep(2000); Class.forName(Config.JDBC_DRIVER_NAME); @@ -475,7 +480,7 @@ public class IoTDBSingleClientPostBackTest { Thread.sleep(2000); Class.forName(Config.JDBC_DRIVER_NAME); try (Connection connection = DriverManager - .getConnection(Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", + .getConnection(String.format("jdbc:iotdb://%s:6667/", serverIpTest), "root", "root")) { Statement statement = connection.createStatement(); boolean hasResultSet = statement.execute("select * from root.vehicle"); @@ -538,7 +543,7 @@ public class IoTDBSingleClientPostBackTest { Connection connection = null; try { connection = DriverManager - .getConnection("jdbc:iotdb://192.168.130.17:6667/", "root", "root"); + .getConnection("jdbc:iotdb://192.168.130.8:6667/", "root", "root"); Statement statement = connection.createStatement(); boolean hasResultSet = statement.execute("select * from root.vehicle"); if (hasResultSet) { diff --git a/iotdb/src/test/java/org/apache/iotdb/db/sync/test/CreateDataSender1.java b/iotdb/src/test/java/org/apache/iotdb/db/sync/test/SyncTestClient1.java similarity index 97% rename from iotdb/src/test/java/org/apache/iotdb/db/sync/test/CreateDataSender1.java rename to iotdb/src/test/java/org/apache/iotdb/db/sync/test/SyncTestClient1.java index 42f1965..35f2f1f 100644 --- a/iotdb/src/test/java/org/apache/iotdb/db/sync/test/CreateDataSender1.java +++ b/iotdb/src/test/java/org/apache/iotdb/db/sync/test/SyncTestClient1.java @@ -37,12 +37,12 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * CreateDataSender1 is used to generate data of whole timeseries (simulating jilian scene) to test stability of + * SyncTestClient1 is used to generate data of whole timeseries (simulating jilian scene) to test stability of * sync function. * * @author Tianan Li */ -public class CreateDataSender1 { +public class SyncTestClient1 { private static final int TIME_INTERVAL = 0; private static final int TOTAL_DATA = 2000000; @@ -58,7 +58,7 @@ public class CreateDataSender1 { private static final int MAX_FLOAT = 30; private static final int STRING_LENGTH = 5; private static final int BATCH_SQL = 10000; - private static final Logger logger = LoggerFactory.getLogger(CreateDataSender1.class); + private static final Logger logger = LoggerFactory.getLogger(SyncTestClient1.class); /** * generate time series map from file. diff --git a/iotdb/src/test/java/org/apache/iotdb/db/sync/test/CreateDataSender2.java b/iotdb/src/test/java/org/apache/iotdb/db/sync/test/SyncTestClient2.java similarity index 97% rename from iotdb/src/test/java/org/apache/iotdb/db/sync/test/CreateDataSender2.java rename to iotdb/src/test/java/org/apache/iotdb/db/sync/test/SyncTestClient2.java index f335e5e..1f0b84c 100644 --- a/iotdb/src/test/java/org/apache/iotdb/db/sync/test/CreateDataSender2.java +++ b/iotdb/src/test/java/org/apache/iotdb/db/sync/test/SyncTestClient2.java @@ -35,12 +35,12 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * CreateDataSender2 is used to generate data of half timeseries (simulating jilian scene) to test stability of + * SyncTestClient2 is used to generate data of half timeseries (simulating jilian scene) to test stability of * sync function. * * @author Tianan Li */ -public class CreateDataSender2 { +public class SyncTestClient2 { private static final int TIME_INTERVAL = 0; private static final int TOTAL_DATA = 2000000; @@ -56,7 +56,7 @@ public class CreateDataSender2 { private static final int MAX_FLOAT = 30; private static final int STRING_LENGTH = 5; private static final int BATCH_SQL = 10000; - private static final Logger LOGGER = LoggerFactory.getLogger(CreateDataSender2.class); + private static final Logger LOGGER = LoggerFactory.getLogger(SyncTestClient2.class); /** * generate time series map from file. diff --git a/iotdb/src/test/java/org/apache/iotdb/db/sync/test/CreateDataSender3.java b/iotdb/src/test/java/org/apache/iotdb/db/sync/test/SyncTestClient3.java similarity index 96% rename from iotdb/src/test/java/org/apache/iotdb/db/sync/test/CreateDataSender3.java rename to iotdb/src/test/java/org/apache/iotdb/db/sync/test/SyncTestClient3.java index 83693ca..ed535fb 100644 --- a/iotdb/src/test/java/org/apache/iotdb/db/sync/test/CreateDataSender3.java +++ b/iotdb/src/test/java/org/apache/iotdb/db/sync/test/SyncTestClient3.java @@ -35,12 +35,12 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * CreateDataSender3 is used to generate data of another half timeseries (simulating jilian scene) which is - * different to those in CreateDataSender2 to test stability of sync function. + * SyncTestClient3 is used to generate data of another half timeseries (simulating jilian scene) which is + * different to those in SyncTestClient2 to test stability of sync function. * * @author Tianan Li */ -public class CreateDataSender3 { +public class SyncTestClient3 { private static final int TIME_INTERVAL = 0; private static final int TOTAL_DATA = 2000000; @@ -56,7 +56,7 @@ public class CreateDataSender3 { private static final int MAX_FLOAT = 30; private static final int STRING_LENGTH = 5; private static final int BATCH_SQL = 10000; - private static final Logger LOGGER = LoggerFactory.getLogger(CreateDataSender3.class); + private static final Logger LOGGER = LoggerFactory.getLogger(SyncTestClient3.class); /** * generate time series map from file. diff --git a/iotdb/src/test/java/org/apache/iotdb/db/sync/test/Utils.java b/iotdb/src/test/java/org/apache/iotdb/db/sync/test/Utils.java index 5f968d4..66322b7 100644 --- a/iotdb/src/test/java/org/apache/iotdb/db/sync/test/Utils.java +++ b/iotdb/src/test/java/org/apache/iotdb/db/sync/test/Utils.java @@ -19,7 +19,7 @@ package org.apache.iotdb.db.sync.test; /** - * * @author Tianan Li + * @author Tianan Li */ public class Utils { diff --git a/iotdb/src/test/resources/start-postBackTest.sh b/iotdb/src/test/resources/start-sync-test.sh similarity index 100% rename from iotdb/src/test/resources/start-postBackTest.sh rename to iotdb/src/test/resources/start-sync-test.sh diff --git a/iotdb/src/test/resources/stop-postBackTest.sh b/iotdb/src/test/resources/stop-sync-test.sh similarity index 100% rename from iotdb/src/test/resources/stop-postBackTest.sh rename to iotdb/src/test/resources/stop-sync-test.sh
