This is an automated email from the ASF dual-hosted git repository.

lta pushed a commit to branch Sync-Reconstruct
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git

commit 7471c48f426f89c241be3504c068ec92d1501e7d
Author: lta <[email protected]>
AuthorDate: Mon Mar 18 10:35:57 2019 +0800

    test sync function and modify the implementation of singleton
---
 iotdb/iotdb/conf/iotdb-sync-client.properties      |  11 ++-
 .../org/apache/iotdb/db/sync/conf/Constans.java    |   1 +
 .../iotdb/db/sync/conf/SyncSenderConfig.java       |  21 ++--
 .../iotdb/db/sync/conf/SyncSenderDescriptor.java   |   6 +-
 .../iotdb/db/sync/sender/FileSenderImpl.java       | 110 ++++++++++++++-------
 5 files changed, 90 insertions(+), 59 deletions(-)

diff --git a/iotdb/iotdb/conf/iotdb-sync-client.properties 
b/iotdb/iotdb/conf/iotdb-sync-client.properties
index bb70b2f..479d1c0 100644
--- a/iotdb/iotdb/conf/iotdb-sync-client.properties
+++ b/iotdb/iotdb/conf/iotdb-sync-client.properties
@@ -17,16 +17,19 @@
 # under the License.
 #
 
+# Sync server port address
 server_ip=127.0.0.1
-# PostBack server port address
+
+# Sync client port
 server_port=5555
-# PostBack client port
-client_port=6666
+
 # The cycle time of post data back to receiver, the unit of time is second
 upload_cycle_in_seconds=600
-# Set bufferWrite data absolute path of IoTDB 
+
+# Set bufferWrite data absolute path of IoTDB
 # It needs to be set with iotdb_schema_directory, they have to belong to the 
same IoTDB
 # iotdb_bufferWrite_directory = D:\\iotdb\\data\\data\\settled
+
 # Set schema file absolute path of IoTDB
 # It needs to be set with iotdb_bufferWrite_directory, they have to belong to 
the same IoTDB
 # iotdb_schema_directory = D:\\iotdb\\data\\system\\schema\\mlog.txt
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/sync/conf/Constans.java 
b/iotdb/src/main/java/org/apache/iotdb/db/sync/conf/Constans.java
index 739bbbf..8e0556c 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/sync/conf/Constans.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/sync/conf/Constans.java
@@ -30,6 +30,7 @@ public class Constans {
   public static final String SYNC_CLIENT = "sync-client";
   public static final String SYNC_SERVER = "sync-server";
 
+  public static final String LOCK_FILE_NAME = "sync-lock";
   public static final String UUID_FILE_NAME = "uuid.txt";
   public static final String LAST_LOCAL_FILE_NAME = "last_local_files.txt";
   public static final String DATA_SNAPSHOT_NAME = "data-snapshot";
diff --git 
a/iotdb/src/main/java/org/apache/iotdb/db/sync/conf/SyncSenderConfig.java 
b/iotdb/src/main/java/org/apache/iotdb/db/sync/conf/SyncSenderConfig.java
index 05b4659..3fcdb15 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/sync/conf/SyncSenderConfig.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/sync/conf/SyncSenderConfig.java
@@ -30,15 +30,14 @@ public class SyncSenderConfig {
   private String[] bufferwriteDirectory = 
IoTDBDescriptor.getInstance().getConfig()
       .getBufferWriteDirs();
   private String dataDirectory = 
IoTDBDescriptor.getInstance().getConfig().getDataDir();
+  private String lockFilePath;
   private String uuidPath;
   private String lastFileInfo;
   private String[] snapshotPaths;
   private String schemaPath;
   private String serverIp = "127.0.0.1";
   private int serverPort = 5555;
-  private int clientPort = 6666;
   private int uploadCycleInSeconds = 10;
-  private boolean clearEnable = false;
 
   public void init() {
     String metadataDirPath = 
IoTDBDescriptor.getInstance().getConfig().getMetadataDir();
@@ -51,6 +50,8 @@ public class SyncSenderConfig {
         && dataDirectory.charAt(dataDirectory.length() - 1) != 
File.separatorChar) {
       dataDirectory += File.separatorChar;
     }
+    lockFilePath =
+        dataDirectory + Constans.SYNC_CLIENT + File.separatorChar + 
Constans.LOCK_FILE_NAME;
     uuidPath = dataDirectory + Constans.SYNC_CLIENT + File.separatorChar + 
Constans.UUID_FILE_NAME;
     lastFileInfo =
         dataDirectory + Constans.SYNC_CLIENT + File.separatorChar + 
Constans.LAST_LOCAL_FILE_NAME;
@@ -133,14 +134,6 @@ public class SyncSenderConfig {
     this.serverPort = serverPort;
   }
 
-  public int getClientPort() {
-    return clientPort;
-  }
-
-  public void setClientPort(int clientPort) {
-    this.clientPort = clientPort;
-  }
-
   public int getUploadCycleInSeconds() {
     return uploadCycleInSeconds;
   }
@@ -149,11 +142,11 @@ public class SyncSenderConfig {
     this.uploadCycleInSeconds = uploadCycleInSeconds;
   }
 
-  public boolean getClearEnable() {
-    return clearEnable;
+  public String getLockFilePath() {
+    return lockFilePath;
   }
 
-  public void setClearEnable(boolean clearEnable) {
-    this.clearEnable = clearEnable;
+  public void setLockFilePath(String lockFilePath) {
+    this.lockFilePath = lockFilePath;
   }
 }
diff --git 
a/iotdb/src/main/java/org/apache/iotdb/db/sync/conf/SyncSenderDescriptor.java 
b/iotdb/src/main/java/org/apache/iotdb/db/sync/conf/SyncSenderDescriptor.java
index 9ca52c1..301e13f 100644
--- 
a/iotdb/src/main/java/org/apache/iotdb/db/sync/conf/SyncSenderDescriptor.java
+++ 
b/iotdb/src/main/java/org/apache/iotdb/db/sync/conf/SyncSenderDescriptor.java
@@ -89,9 +89,6 @@ public class SyncSenderDescriptor {
       conf.setServerIp(properties.getProperty("server_ip", 
conf.getServerIp()));
       conf.setServerPort(Integer
           .parseInt(properties.getProperty("server_port", 
Integer.toString(conf.getServerPort()))));
-
-      conf.setClientPort(Integer
-          .parseInt(properties.getProperty("client_port", 
Integer.toString(conf.getClientPort()))));
       conf.setUploadCycleInSeconds(Integer.parseInt(properties
           .getProperty("upload_cycle_in_seconds",
               Integer.toString(conf.getUploadCycleInSeconds()))));
@@ -106,7 +103,8 @@ public class SyncSenderDescriptor {
       conf.setUuidPath(
           dataDirectory + Constans.SYNC_CLIENT + File.separatorChar + 
Constans.UUID_FILE_NAME);
       conf.setLastFileInfo(
-          dataDirectory + Constans.SYNC_CLIENT + File.separatorChar + 
Constans.LAST_LOCAL_FILE_NAME);
+          dataDirectory + Constans.SYNC_CLIENT + File.separatorChar
+              + Constans.LAST_LOCAL_FILE_NAME);
       String[] iotdbBufferwriteDirectory = conf.getBufferwriteDirectory();
       String[] snapshots = new String[conf.getBufferwriteDirectory().length];
       for (int i = 0; i < conf.getBufferwriteDirectory().length; i++) {
diff --git 
a/iotdb/src/main/java/org/apache/iotdb/db/sync/sender/FileSenderImpl.java 
b/iotdb/src/main/java/org/apache/iotdb/db/sync/sender/FileSenderImpl.java
index 0279033..8c95331 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/sync/sender/FileSenderImpl.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/sync/sender/FileSenderImpl.java
@@ -10,7 +10,6 @@
  *     http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  * KIND, either express or implied.  See the License for the
  * specific language governing permissions and limitations
@@ -25,11 +24,11 @@ import java.io.FileInputStream;
 import java.io.FileOutputStream;
 import java.io.FileReader;
 import java.io.IOException;
+import java.io.RandomAccessFile;
 import java.math.BigInteger;
 import java.net.InetAddress;
-import java.net.ServerSocket;
-import java.net.Socket;
 import java.nio.ByteBuffer;
+import java.nio.channels.FileLock;
 import java.nio.file.FileSystems;
 import java.nio.file.Files;
 import java.nio.file.Path;
@@ -149,7 +148,7 @@ public class FileSenderImpl implements FileSender {
       throws InterruptedException, IOException, SyncConnectionException {
     Thread.currentThread().setName(ThreadName.SYNC_CLIENT.getName());
     FileSenderImpl fileSenderImpl = new FileSenderImpl();
-    fileSenderImpl.verifyPort();
+    fileSenderImpl.verifySingleton();
     fileSenderImpl.startMonitor();
     fileSenderImpl.timedTask();
   }
@@ -198,23 +197,19 @@ public class FileSenderImpl implements FileSender {
       }
     }
 
-    syncStatus = true;
-
-    // 2. Connect to sync server and Confirm Identity
-    establishConnection(config.getServerIp(), config.getServerPort());
-    if (!confirmIdentity(config.getUuidPath())) {
-      LOGGER.error("Sorry, you do not have the permission to connect to sync 
receiver.");
-      syncStatus = false;
-      return;
-    }
-
-    // 3. Acquire valid files and check
+    // 2. Acquire valid files and check
     fileManager.init();
     validAllFiles = fileManager.getValidAllFiles();
     currentLocalFiles = fileManager.getCurrentLocalFiles();
     if (SyncUtils.isEmpty(validAllFiles)) {
       LOGGER.info("There has no file to sync !");
-      syncStatus = false;
+      return;
+    }
+
+    // 3. Connect to sync server and Confirm Identity
+    establishConnection(config.getServerIp(), config.getServerPort());
+    if (!confirmIdentity(config.getUuidPath())) {
+      LOGGER.error("Sorry, you do not have the permission to connect to sync 
receiver.");
       return;
     }
 
@@ -223,6 +218,8 @@ public class FileSenderImpl implements FileSender {
       validFileSnapshot.put(entry.getKey(), 
makeFileSnapshot(entry.getValue()));
     }
 
+    syncStatus = true;
+
     // 5. Sync schema
     syncSchema();
 
@@ -463,32 +460,71 @@ public class FileSenderImpl implements FileSender {
   }
 
   /**
-   * The method is to verify whether the client port is bind or not, ensuring 
that only one client
-   * is running.
+   * The method is to verify whether the client lock file is locked or not, 
ensuring that only one
+   * client is running.
+   */
+  private void verifySingleton() throws IOException {
+    File lockFile = new File(config.getLockFilePath());
+    if (!lockFile.getParentFile().exists()) {
+      lockFile.getParentFile().mkdirs();
+    }
+    if (!lockFile.exists()) {
+      lockFile.createNewFile();
+    }
+    if (!lockInstance(config.getLockFilePath())) {
+      LOGGER.error("Sync client is running.");
+      System.exit(1);
+    }
+//    try {
+//      Socket socket = new Socket("localhost", config.getClientPort());
+//      socket.close();
+//      LOGGER.error("Sync client has been started!");
+//      System.exit(1);
+//    } catch (IOException e) {
+//      try (ServerSocket listenerSocket = new 
ServerSocket(config.getClientPort())) {
+//        Thread listener = new Thread(() -> {
+//          try {
+//            while (true) {
+//              listenerSocket.accept();
+//            }
+//          } catch (IOException e2) {
+//            LOGGER.error("Unable to  listen to port{}", 
config.getClientPort(), e2);
+//          }
+//        });
+//        listener.start();
+//      } catch (IOException e1) {
+//        LOGGER.error("Unable to listen to port{}", config.getClientPort());
+//        throw new IOException();
+//      }
+//    }
+  }
+
+  /**
+   * Try to lock lockfile. if failed, it means that sync client has benn 
started.
+   *
+   * @param lockFile path of lockfile
    */
-  private void verifyPort() throws IOException {
+  private static boolean lockInstance(final String lockFile) {
     try {
-      Socket socket = new Socket("localhost", config.getClientPort());
-      socket.close();
-      LOGGER.error("Sync client has been started!");
-      System.exit(0);
-    } catch (IOException e) {
-      try (ServerSocket listenerSocket = new 
ServerSocket(config.getClientPort())) {
-        Thread listener = new Thread(() -> {
-          while (true) {
-            try {
-              listenerSocket.accept();
-            } catch (IOException e2) {
-              LOGGER.error("Unable to  listen to port{}", 
config.getClientPort(), e2);
-            }
+      final File file = new File(lockFile);
+      final RandomAccessFile randomAccessFile = new RandomAccessFile(file, 
"rw");
+      final FileLock fileLock = randomAccessFile.getChannel().tryLock();
+      if (fileLock != null) {
+        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
+          try {
+            fileLock.release();
+            randomAccessFile.close();
+            file.delete();
+          } catch (Exception e) {
+            LOGGER.error("Unable to remove lock file: {}", lockFile, e);
           }
-        });
-        listener.start();
-      } catch (IOException e1) {
-        LOGGER.error("Unable to listen to port{}", config.getClientPort());
-        throw new IOException();
+        }));
+        return true;
       }
+    } catch (Exception e) {
+      LOGGER.error("Unable to create and/or lock file: {}", lockFile, e);
     }
+    return false;
   }
 
   private static class InstanceHolder {

Reply via email to