This is an automated email from the ASF dual-hosted git repository. lta pushed a commit to branch Sync-Reconstruct in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
commit 7471c48f426f89c241be3504c068ec92d1501e7d Author: lta <[email protected]> AuthorDate: Mon Mar 18 10:35:57 2019 +0800 test sync function and modify the implementation of singleton --- iotdb/iotdb/conf/iotdb-sync-client.properties | 11 ++- .../org/apache/iotdb/db/sync/conf/Constans.java | 1 + .../iotdb/db/sync/conf/SyncSenderConfig.java | 21 ++-- .../iotdb/db/sync/conf/SyncSenderDescriptor.java | 6 +- .../iotdb/db/sync/sender/FileSenderImpl.java | 110 ++++++++++++++------- 5 files changed, 90 insertions(+), 59 deletions(-) diff --git a/iotdb/iotdb/conf/iotdb-sync-client.properties b/iotdb/iotdb/conf/iotdb-sync-client.properties index bb70b2f..479d1c0 100644 --- a/iotdb/iotdb/conf/iotdb-sync-client.properties +++ b/iotdb/iotdb/conf/iotdb-sync-client.properties @@ -17,16 +17,19 @@ # under the License. # +# Sync server port address server_ip=127.0.0.1 -# PostBack server port address + +# Sync client port server_port=5555 -# PostBack client port -client_port=6666 + # The cycle time of post data back to receiver, the unit of time is second upload_cycle_in_seconds=600 -# Set bufferWrite data absolute path of IoTDB + +# Set bufferWrite data absolute path of IoTDB # It needs to be set with iotdb_schema_directory, they have to belong to the same IoTDB # iotdb_bufferWrite_directory = D:\\iotdb\\data\\data\\settled + # Set schema file absolute path of IoTDB # It needs to be set with iotdb_bufferWrite_directory, they have to belong to the same IoTDB # iotdb_schema_directory = D:\\iotdb\\data\\system\\schema\\mlog.txt diff --git a/iotdb/src/main/java/org/apache/iotdb/db/sync/conf/Constans.java b/iotdb/src/main/java/org/apache/iotdb/db/sync/conf/Constans.java index 739bbbf..8e0556c 100644 --- a/iotdb/src/main/java/org/apache/iotdb/db/sync/conf/Constans.java +++ b/iotdb/src/main/java/org/apache/iotdb/db/sync/conf/Constans.java @@ -30,6 +30,7 @@ public class Constans { public static final String SYNC_CLIENT = "sync-client"; public static final String SYNC_SERVER = "sync-server"; + public static final String LOCK_FILE_NAME = "sync-lock"; public static final String UUID_FILE_NAME = "uuid.txt"; public static final String LAST_LOCAL_FILE_NAME = "last_local_files.txt"; public static final String DATA_SNAPSHOT_NAME = "data-snapshot"; diff --git a/iotdb/src/main/java/org/apache/iotdb/db/sync/conf/SyncSenderConfig.java b/iotdb/src/main/java/org/apache/iotdb/db/sync/conf/SyncSenderConfig.java index 05b4659..3fcdb15 100644 --- a/iotdb/src/main/java/org/apache/iotdb/db/sync/conf/SyncSenderConfig.java +++ b/iotdb/src/main/java/org/apache/iotdb/db/sync/conf/SyncSenderConfig.java @@ -30,15 +30,14 @@ public class SyncSenderConfig { private String[] bufferwriteDirectory = IoTDBDescriptor.getInstance().getConfig() .getBufferWriteDirs(); private String dataDirectory = IoTDBDescriptor.getInstance().getConfig().getDataDir(); + private String lockFilePath; private String uuidPath; private String lastFileInfo; private String[] snapshotPaths; private String schemaPath; private String serverIp = "127.0.0.1"; private int serverPort = 5555; - private int clientPort = 6666; private int uploadCycleInSeconds = 10; - private boolean clearEnable = false; public void init() { String metadataDirPath = IoTDBDescriptor.getInstance().getConfig().getMetadataDir(); @@ -51,6 +50,8 @@ public class SyncSenderConfig { && dataDirectory.charAt(dataDirectory.length() - 1) != File.separatorChar) { dataDirectory += File.separatorChar; } + lockFilePath = + dataDirectory + Constans.SYNC_CLIENT + File.separatorChar + Constans.LOCK_FILE_NAME; uuidPath = dataDirectory + Constans.SYNC_CLIENT + File.separatorChar + Constans.UUID_FILE_NAME; lastFileInfo = dataDirectory + Constans.SYNC_CLIENT + File.separatorChar + Constans.LAST_LOCAL_FILE_NAME; @@ -133,14 +134,6 @@ public class SyncSenderConfig { this.serverPort = serverPort; } - public int getClientPort() { - return clientPort; - } - - public void setClientPort(int clientPort) { - this.clientPort = clientPort; - } - public int getUploadCycleInSeconds() { return uploadCycleInSeconds; } @@ -149,11 +142,11 @@ public class SyncSenderConfig { this.uploadCycleInSeconds = uploadCycleInSeconds; } - public boolean getClearEnable() { - return clearEnable; + public String getLockFilePath() { + return lockFilePath; } - public void setClearEnable(boolean clearEnable) { - this.clearEnable = clearEnable; + public void setLockFilePath(String lockFilePath) { + this.lockFilePath = lockFilePath; } } diff --git a/iotdb/src/main/java/org/apache/iotdb/db/sync/conf/SyncSenderDescriptor.java b/iotdb/src/main/java/org/apache/iotdb/db/sync/conf/SyncSenderDescriptor.java index 9ca52c1..301e13f 100644 --- a/iotdb/src/main/java/org/apache/iotdb/db/sync/conf/SyncSenderDescriptor.java +++ b/iotdb/src/main/java/org/apache/iotdb/db/sync/conf/SyncSenderDescriptor.java @@ -89,9 +89,6 @@ public class SyncSenderDescriptor { conf.setServerIp(properties.getProperty("server_ip", conf.getServerIp())); conf.setServerPort(Integer .parseInt(properties.getProperty("server_port", Integer.toString(conf.getServerPort())))); - - conf.setClientPort(Integer - .parseInt(properties.getProperty("client_port", Integer.toString(conf.getClientPort())))); conf.setUploadCycleInSeconds(Integer.parseInt(properties .getProperty("upload_cycle_in_seconds", Integer.toString(conf.getUploadCycleInSeconds())))); @@ -106,7 +103,8 @@ public class SyncSenderDescriptor { conf.setUuidPath( dataDirectory + Constans.SYNC_CLIENT + File.separatorChar + Constans.UUID_FILE_NAME); conf.setLastFileInfo( - dataDirectory + Constans.SYNC_CLIENT + File.separatorChar + Constans.LAST_LOCAL_FILE_NAME); + dataDirectory + Constans.SYNC_CLIENT + File.separatorChar + + Constans.LAST_LOCAL_FILE_NAME); String[] iotdbBufferwriteDirectory = conf.getBufferwriteDirectory(); String[] snapshots = new String[conf.getBufferwriteDirectory().length]; for (int i = 0; i < conf.getBufferwriteDirectory().length; i++) { diff --git a/iotdb/src/main/java/org/apache/iotdb/db/sync/sender/FileSenderImpl.java b/iotdb/src/main/java/org/apache/iotdb/db/sync/sender/FileSenderImpl.java index 0279033..8c95331 100644 --- a/iotdb/src/main/java/org/apache/iotdb/db/sync/sender/FileSenderImpl.java +++ b/iotdb/src/main/java/org/apache/iotdb/db/sync/sender/FileSenderImpl.java @@ -10,7 +10,6 @@ * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations @@ -25,11 +24,11 @@ import java.io.FileInputStream; import java.io.FileOutputStream; import java.io.FileReader; import java.io.IOException; +import java.io.RandomAccessFile; import java.math.BigInteger; import java.net.InetAddress; -import java.net.ServerSocket; -import java.net.Socket; import java.nio.ByteBuffer; +import java.nio.channels.FileLock; import java.nio.file.FileSystems; import java.nio.file.Files; import java.nio.file.Path; @@ -149,7 +148,7 @@ public class FileSenderImpl implements FileSender { throws InterruptedException, IOException, SyncConnectionException { Thread.currentThread().setName(ThreadName.SYNC_CLIENT.getName()); FileSenderImpl fileSenderImpl = new FileSenderImpl(); - fileSenderImpl.verifyPort(); + fileSenderImpl.verifySingleton(); fileSenderImpl.startMonitor(); fileSenderImpl.timedTask(); } @@ -198,23 +197,19 @@ public class FileSenderImpl implements FileSender { } } - syncStatus = true; - - // 2. Connect to sync server and Confirm Identity - establishConnection(config.getServerIp(), config.getServerPort()); - if (!confirmIdentity(config.getUuidPath())) { - LOGGER.error("Sorry, you do not have the permission to connect to sync receiver."); - syncStatus = false; - return; - } - - // 3. Acquire valid files and check + // 2. Acquire valid files and check fileManager.init(); validAllFiles = fileManager.getValidAllFiles(); currentLocalFiles = fileManager.getCurrentLocalFiles(); if (SyncUtils.isEmpty(validAllFiles)) { LOGGER.info("There has no file to sync !"); - syncStatus = false; + return; + } + + // 3. Connect to sync server and Confirm Identity + establishConnection(config.getServerIp(), config.getServerPort()); + if (!confirmIdentity(config.getUuidPath())) { + LOGGER.error("Sorry, you do not have the permission to connect to sync receiver."); return; } @@ -223,6 +218,8 @@ public class FileSenderImpl implements FileSender { validFileSnapshot.put(entry.getKey(), makeFileSnapshot(entry.getValue())); } + syncStatus = true; + // 5. Sync schema syncSchema(); @@ -463,32 +460,71 @@ public class FileSenderImpl implements FileSender { } /** - * The method is to verify whether the client port is bind or not, ensuring that only one client - * is running. + * The method is to verify whether the client lock file is locked or not, ensuring that only one + * client is running. + */ + private void verifySingleton() throws IOException { + File lockFile = new File(config.getLockFilePath()); + if (!lockFile.getParentFile().exists()) { + lockFile.getParentFile().mkdirs(); + } + if (!lockFile.exists()) { + lockFile.createNewFile(); + } + if (!lockInstance(config.getLockFilePath())) { + LOGGER.error("Sync client is running."); + System.exit(1); + } +// try { +// Socket socket = new Socket("localhost", config.getClientPort()); +// socket.close(); +// LOGGER.error("Sync client has been started!"); +// System.exit(1); +// } catch (IOException e) { +// try (ServerSocket listenerSocket = new ServerSocket(config.getClientPort())) { +// Thread listener = new Thread(() -> { +// try { +// while (true) { +// listenerSocket.accept(); +// } +// } catch (IOException e2) { +// LOGGER.error("Unable to listen to port{}", config.getClientPort(), e2); +// } +// }); +// listener.start(); +// } catch (IOException e1) { +// LOGGER.error("Unable to listen to port{}", config.getClientPort()); +// throw new IOException(); +// } +// } + } + + /** + * Try to lock lockfile. if failed, it means that sync client has benn started. + * + * @param lockFile path of lockfile */ - private void verifyPort() throws IOException { + private static boolean lockInstance(final String lockFile) { try { - Socket socket = new Socket("localhost", config.getClientPort()); - socket.close(); - LOGGER.error("Sync client has been started!"); - System.exit(0); - } catch (IOException e) { - try (ServerSocket listenerSocket = new ServerSocket(config.getClientPort())) { - Thread listener = new Thread(() -> { - while (true) { - try { - listenerSocket.accept(); - } catch (IOException e2) { - LOGGER.error("Unable to listen to port{}", config.getClientPort(), e2); - } + final File file = new File(lockFile); + final RandomAccessFile randomAccessFile = new RandomAccessFile(file, "rw"); + final FileLock fileLock = randomAccessFile.getChannel().tryLock(); + if (fileLock != null) { + Runtime.getRuntime().addShutdownHook(new Thread(() -> { + try { + fileLock.release(); + randomAccessFile.close(); + file.delete(); + } catch (Exception e) { + LOGGER.error("Unable to remove lock file: {}", lockFile, e); } - }); - listener.start(); - } catch (IOException e1) { - LOGGER.error("Unable to listen to port{}", config.getClientPort()); - throw new IOException(); + })); + return true; } + } catch (Exception e) { + LOGGER.error("Unable to create and/or lock file: {}", lockFile, e); } + return false; } private static class InstanceHolder {
